In [0]:
# accessing the secrets using dbutils.secrets and storing it in variable
access_key = dbutils.secrets.get(scope="<aws_credentials>", key="<aws_access_key>")
secret_key = dbutils.secrets.get(scope="<aws_credentials>", key="<aws_secret_key>")

# creating a spark configuration to estaibilish connection with s3
spark.conf.set("fs.s3a.access.key", access_key)
spark.conf.set("fs.s3a.secret.key", secret_key)
aws_region = "us-east-1"
spark.conf.set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")


In [0]:
dbutils.fs.ls("s3a://intuz-trainee-saumya-jain")

In [0]:
# removing file from s3 bucket 
#dbutils.fs.rm('s3a://intuz-trainee-saumya-jain/supermarket_sales_parquet', True)

In [0]:
# reading data from s3 bucket 
df_s3 = spark.read.option("header",True).parquet("s3a://intuz-trainee-saumya-jain/supermarket_sales_parquet")

In [0]:
display(df_s3)

In [0]:
df_s3.printSchema()

BRONZE LAYER

In [0]:
import dlt

@dlt.table(
  name = "supermarket_bronze",
comment= "This table contains raw supermarket data")

def supermarket_bronze():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("header", True)
    .option("inferschema",True)
    # .schema(schema)
    .load("s3a://intuz-trainee-saumya-jain/supermarket_sales_parquet")
  )
  df = (df.withColumnRenamed("Invoice ID","Invoice_ID")
    .withColumnRenamed("Customer type","Customer_type")
    .withColumnRenamed("Product line", "Product_line")
    .withColumnRenamed("Unit price", "Unit_price")
    .withColumnRenamed("Tax 5%", "Tax_5_percent")
    .withColumnRenamed("gross margin percentage", "gross_margin_percentage")
    .withColumnRenamed("gross income", "gross_income"))

  return df

In [0]:
from pyspark.sql.functions import to_date, col,to_timestamp

@dlt.view(
    name="bronze_view",
    comment="This view contains raw supermarket data"
)
def bronze_view():

    df = dlt.read("supermarket_bronze")

    # changing data types of columns
    df = (df.withColumn("Unit_price", df["Unit_price"].cast("float"))
          .withColumn("Tax_5_percent", df["Tax_5_percent"].cast("float"))
          .withColumn("Quantity", df["Quantity"].cast("int"))
          .withColumn("Total", df["Total"].cast("float"))
          .withColumn("cogs", df["cogs"].cast("float"))
          .withColumn("gross_margin_percentage", df["gross_margin_percentage"].cast("float"))
          .withColumn("gross_income", df["gross_income"].cast("float"))
          .withColumn("Rating", df["Rating"].cast("float"))
          .withColumn("Date", to_date(col("Date"), "M/d/yyyy"))
          .withColumn("Time", to_timestamp(col("Time"), "HH:mm")))
    
    df = df.drop(col("_rescued_data"))
    
    return df

SILVER LAYER

In [0]:
from pyspark.sql.functions import to_date, col,to_timestamp,date_format
from pyspark.sql.functions import when

# creating silver table 

@dlt.table(
  name = "supermarket_silver",
  comment="this table contains processed supermarket data"
)

def supermarket_silver():
  df = spark.read.table("bronze_view")
  
  # adding new columns in silver table 
  df = df.withColumn(
    "satisfaction_tier",
    when(col("rating") >= 7, "High")
    .when(col("rating") >= 5, "Medium")
    .otherwise("Low"),  # All other ratings are considered "Low"
)
  df = df.withColumn("SalesMonth",date_format(col("Date"), "MMM"))  # Month
  
  return df

  
  

In [0]:
# %sql
# SELECT * FROM saumyaj_catalog.

In [0]:
# creating silver view 

import dlt

@dlt.view(
    name = "silver_view",
    comment="This is silver view with processed supermarket data"
)

def silver_view():
    df = dlt.read("supermarket_silver")
    return df

GOLD LAYER

GOLD TABLE 1

In [0]:
import dlt
import pyspark.sql.functions as sf
from pyspark.sql.types import DoubleType

@dlt.create_table( # use create_table decorator
    name = "monthly_sales",
    comment="This table contains sum of sales in each month"
)

def monthly_sales():
    df = spark.read.table("silver_view")
    df = df.withColumn("Total", col("Total").cast(DoubleType())) # Casting to DoubleType

    # Sales Trends Over Time (Monthly)
    monthly_sales = (
        df.groupBy("SalesMonth")
        .agg(sf.sum("Total").alias("MonthlySales"))
    )
    return monthly_sales



GOLD TABLE 2

In [0]:
import pyspark.sql.functions as sf
from pyspark.sql.window import Window

@dlt.create_table( # use create_table decorator
    name = "common_product_lines",
    comment="This table contains the most common product lines purchased in each city"
)

def common_product_lines():
    df = spark.read.table("silver_view")


    # common product lines purchased in each city
    # city_wise_product_line = (
    #     df.groupBy("City","Product_line")
    #     .agg(sf.count("Product_line").alias("Common_product_lines"))
    #     .select("CIty","Product_line").filter("Product_line")
    # )
    # return city_wise_product_line

    # 1. Group by City and Product_line and count the occurrences
    product_line_counts = df.groupBy("City", "Product_line").agg(sf.count("Product_line").alias("pl_count"))

    # 2. Define a WindowSpec to partition by City and order by pl_count descending
    window_spec = Window.partitionBy("City").orderBy(col("pl_count").desc())

    # 3. Use row_number() to assign a rank to each product line within each city
    ranked_product_lines = product_line_counts.withColumn("rn", sf.row_number().over(window_spec))

    # 4. Filter to get only the top product line (rank = 1) for each city
    top_product_lines = ranked_product_lines.filter(col("rn") == 1).select("City", "Product_line", "pl_count")

    return top_product_lines


GOLD TABLE 3

In [0]:
import dlt
import pyspark.sql.functions as sf
from pyspark.sql.types import DoubleType

# How does customer type (Member vs. Normal) influence total spending across different branches?

@dlt.create_table( # use create_table decorator
    name = "customer_type_influence",
    comment="This table contains customer type influence on total spending across different branches"
)

def customer_type_influence():
    df = spark.read.table("silver_view")

    # Customer type influence on sales across different branches
    customer_type_sales = (
        df.groupBy("Branch", "Customer_type")
        .agg(sf.sum("Total").alias("TotalSpending"))
        .orderBy("Branch", "Customer_type")
    )
    return customer_type_sales

In [0]:

# # -- SELECT City,Product_line,MAX(count(pl_count))
# # -- FROM (
# # --   SELECT City,Product_line,COUNT(Product_line) AS pl_count FROM saumyaj_catalog.supermarket_schema.supermarket_silver 
# # -- GROUP BY City,Product_line
# # -- ORDER BY COUNT(Product_line)
# # -- )
# # -- GROUP BY City,Product_line;


# -- WITH RankedProductLines AS (
# --   SELECT 
# --     City, 
# --     Product_line, 
# --     COUNT(Product_line) AS pl_count,
# --     ROW_NUMBER() OVER (PARTITION BY City ORDER BY COUNT(Product_line) DESC) AS rn
# --   FROM saumyaj_catalog.supermarket_schema.supermarket_silver
# --   GROUP BY City, Product_line
# -- )
# -- SELECT City, Product_line, pl_count
# -- FROM RankedProductLines
# -- WHERE rn = 1;

In [0]:
# %sql
# SELECT City, Count(City)
# FROM saumyaj_catalog.supermarket_schema.supermarket_silver
# GROUP BY City;

In [0]:
dummy_df = spark.read.csv(
    "dbfs:/FileStore/tables/dummy_supermarket_sales.csv",
    header=True,
    inferSchema=True
)
display(dummy_df)

In [0]:
dummy_df.printSchema()

In [0]:
# adding more data in s3 bucket 
# dummy_df.write.mode("append").parquet("s3a://intuz-trainee-saumya-jain/supermarket_sales_parquet")

In [0]:
#updated_s3 = spark.read.parquet("s3a://intuz-trainee-saumya-jain/supermarket_sales_parquet")