In [0]:
from datetime import datetime

today = datetime.today().strftime('%Y-%m-%d')
base_url = "abfss://mycontainer@adlsgen2salesdata2025.dfs.core.windows.net/salesdata"

datasets = {
    "products": f"{base_url}/Products/{today}/Products_{today}.csv",
    "sales": f"{base_url}/Sales/{today}/Sales_{today}.csv",
    "inventory": f"{base_url}/Inventory/{today}/Inventory_{today}.csv",
    "status": f"{base_url}/Status/{today}/Status_{today}.csv",
}

catalog = "my_sales_catalog"
schema = "sales_schema"

for table_name, file_path in datasets.items():
    df = (spark.read
            .option("header", True)
            .option("inferSchema", True)
            .csv(file_path))
    
    (df.write
            .format("delta")
            .mode("overwrite")
            .option("overwriteSchema", "true")
            .saveAsTable(f"{catalog}.{schema}.{table_name}"))
    
    print(f"Table {table_name} updated for current {today} date")

In [0]:
from datetime import datetime

today = datetime.today().strftime('%Y-%m-%d')
base_url = "abfss://mycontainer@adlsgen2salesdata2025.dfs.core.windows.net/salesdata"

datasets = {
    "products": f"{base_url}/Products/{today}/Products_{today}.csv",
    "sales": f"{base_url}/Sales/{today}/Sales_{today}.csv",
    "inventory": f"{base_url}/Inventory/{today}/Inventory_{today}.csv",
    "status": f"{base_url}/Status/{today}/Status_{today}.csv",

}

dataframes = {}

for table_name, file_path in datasets.items():
    df = (spark.read
            .option("header", True)
            .option("inferSchema", True)
            .csv(file_path))

    dataframes[f"{table_name}_df"] = df

    print(f" DataFrame for {table_name} created.")

products_df = dataframes['products_df']
sales_df = dataframes['sales_df']
inventory_df = dataframes['inventory_df']
status_df = dataframes['status_df']


In [0]:
sales_df.show(8)

In [0]:
from pyspark.sql.functions import col, to_date
sales_df = sales_df.withColumn("timestamp", to_date(col("timestamp"), "yyyy-MM-dd"))
sales_df = sales_df.withColumnRenamed("Timestamp", "Date")
sales_df.show(8)

In [0]:
print(sales_df)
print(products_df)
print(inventory_df)
print(status_df)

In [0]:
s = sales_df.alias("s")
p = products_df.alias("p")
st = status_df.alias("st")

joined_sales_df = (s.join(p, s.Product == p.ProductID)
                  .join(st, s.Status == st.StatusID))

joined_sales_df=joined_sales_df.select("s.OrderID", "s.FirstName", "s.LastName", "s.Country", "s.Quantity", "p.ProductName", "s.Price", "s.Date", "st.Status")

joined_sales_df.show(8)


In [0]:
from pyspark.sql.functions import col, count

open_orders_df = joined_sales_df.filter(col("Status") == "Open")


open_orders_df = open_orders_df.groupby("Status").agg(count("Status").alias("TotalOpenOrders"))

open_orders_df.show(8)

In [0]:
from pyspark.sql.functions import count, sum
sales_summary = sales_df.groupBy("Product").agg(count("Quantity").alias("NumberofSales"),(sum("quantity").alias("TotalQuantity")))
sales_summary.show(10)

In [0]:
ss = sales_summary.alias("ss")
p = products_df.alias("p")
i = inventory_df.alias("i")

upd_inv_df = (ss.join(i, ss.Product == i.ProductID, "inner")
              .join(p, ss.Product == p.ProductID, "inner"))

upd_inv_df = upd_inv_df.select("ss.Product", "ss.TotalQuantity", "i.InStock", "p.ProductName")
upd_inv_df = upd_inv_df.withColumn("UpdatedStock", col("InStock") - col("TotalQuantity"))
upd_inv_df = upd_inv_df.orderBy("Product")
upd_inv_df.show(8)


In [0]:
joined_sales_df.write.format("delta").mode("overwrite").saveAsTable("my_sales_catalog.sales_schema.joined_sales")

In [0]:
joined_sales_df.write.mode("append").parquet("abfss://mycontainer@adlsgen2salesdata2025.dfs.core.windows.net/salesdata/cleaned/sales_summary")

In [0]:
upd_inv_df.write.option("header", "true").mode("overwrite").csv("abfss://mycontainer@adlsgen2salesdata2025.dfs.core.windows.net/salesdata/cleaned/inventory")