In [30]:

from pyspark.sql import SparkSession
from pyspark import SparkConf
from itables import init_notebook_mode, show, options
from azure.storage.blob import ContainerClient
import json


#Load the JSON settings file
with open(".vscode/settings.json") as f:
    config = json.load(f)

storage_config = config["storageSettings"]
conn_string = storage_config["connectionString"] 
accountKey = storage_config["AccountKey"]
container_name = storage_config["containerName"]
storage_account_name = storage_config["storageAccountName"]

def showsparkdf(sparkdf):
    pdf = sparkdf.toPandas()
    show(pdf)

def delete_folder_recursively(folder_prefix):
    """
    Recursively deletes all blobs under the given virtual folder path.
    Handles deeply nested blobs and avoids 'directory not empty' issues.
    """
    container_client = ContainerClient.from_connection_string(conn_string, container_name)
    blobs = container_client.list_blobs(name_starts_with=folder_prefix)

    deleted_any = False
    for blob in blobs:
        print(f"Deleting: {blob.name}")
        container_client.delete_blob(blob.name)
        deleted_any = True

    if not deleted_any:
        print(f"Nothing to delete under: {folder_prefix}")
    else:
        print(f"Deleted everything under: {folder_prefix}")



conf = SparkConf()
conf.set("spark.jars.packages", 
            "org.apache.hadoop:hadoop-azure:3.3.4,"
            "com.microsoft.azure:azure-storage:8.6.6,"
            "org.apache.hadoop:hadoop-common:3.3.4,"
            "org.apache.hadoop:hadoop-client:3.3.4")

conf.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
conf.set("fs.azure.skip.metrics", "true")
conf.set("spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped", "true")


#conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", conn_string)
conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", accountKey)

spark = SparkSession.builder.config(conf=conf).getOrCreate()
inputPathPrefix = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net"

ph = spark.read.option("sep", ",").csv(f"{inputPathPrefix}/RawDatasets/ProductHeirarchy.csv", header=True, inferSchema=True)
ph.createOrReplaceTempView("ProductHeirarchy")

pd = spark.read.option("sep", ",").csv(f"{inputPathPrefix}/RawDatasets/PricesDataset.csv", header=True, inferSchema=True)
pd.createOrReplaceTempView("BuyingPrices")

sr = spark.read.option("sep", ",").csv(f"{inputPathPrefix}/RawDatasets/SalesRecords.csv", header=True, inferSchema=True)
sr.createOrReplaceTempView("SalesRecords")


query = "Select * from ProductHeirarchy Limit 10"
filtered_productheirarchy_dataframe = spark.sql(query)
showsparkdf(filtered_productheirarchy_dataframe)

query2 = "Select * from SalesRecords Limit 10"
filtered_salesrecords_sparkframe = spark.sql(query2)

showsparkdf(filtered_salesrecords_sparkframe)


                                                                                

Item Code,Item Name,Category Code,Category Name
Loading ITables v2.3.0 from the internet... (need help?),,,


Date,Time,Item Code,Quantity Sold (kilo),Unit Selling Price (RMB/kg),Sale or Return,Discount (Yes/No)
Loading ITables v2.3.0 from the internet... (need help?),,,,,,


In [None]:
#sample upload back to datalake
myname = "nouman"
output_folder = f"OutputDatasets/{myname}/TestSalesRecords/"
output_path = f"{inputPathPrefix}/{output_folder}"
print(output_path)

delete_folder_recursively(folder_prefix=output_folder)

filtered_salesrecords_sparkframe.write \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", "|") \
    .mode("overwrite") \
    .save(output_path)  
print("wrote output succesfully to the path")      

abfss://digiseriescontainer@digiseriesstorage.dfs.core.windows.net/OutputDatasets/nouman/TestSalesRecords/
Deleting: OutputDatasets/nouman/TestSalesRecords/part-00000-25b908df-d57b-4fef-904d-1802b4a04e45-c000.csv
Deleted everything under: OutputDatasets/nouman/TestSalesRecords/
