In [0]:
%sql
select current_catalog(),current_database()

current_catalog(),current_database()
spark_catalog,default


In [0]:
%sql
create schema if not exists bronze;
create schema if not exists silver;
create schema if not exists gold;

In [0]:

from pyspark.sql.functions import col

dbfsList = dbutils.fs.ls("dbfs:/FileStore/tables/")
df = spark.createDataFrame(dbfsList).toDF("path", "name", "size","modificationTime")
filtered_df = df.filter(col("name").endswith(".parquet"))

display(filtered_df)

path,name,size,modificationTime
dbfs:/FileStore/tables/media_customer_reviews.parquet,media_customer_reviews.parquet,46004,1747220838000
dbfs:/FileStore/tables/media_gold_reviews_chunked.parquet,media_gold_reviews_chunked.parquet,23557,1747220838000
dbfs:/FileStore/tables/sales_customers.parquet,sales_customers.parquet,28493,1747220838000
dbfs:/FileStore/tables/sales_franchises.parquet,sales_franchises.parquet,5905,1747220839000
dbfs:/FileStore/tables/sales_suppliers.parquet,sales_suppliers.parquet,4591,1747220839000
dbfs:/FileStore/tables/sales_transactions.parquet,sales_transactions.parquet,86578,1747220839000


In [0]:
from delta.tables import DeltaTable

def fetch_data(filename,file_type="parquet"):
    # File location 
    file_location = f"dbfs:/FileStore/tables/{filename}.parquet"

    # Readfiles using Autoloader
    df = spark.read.format(file_type)\
        .option("cloudFiles.format", file_type)\
        .option("cloudFiles.schemaLocation", "/tmp/autoloader_schema")\
        .load(file_location)
        
    return df

def create_tables(df,schema,tbl_name,loadtype,PKcols = ""):
    table_exists = spark._jsparkSession.catalog().tableExists(schema, tbl_name)
    if loadtype.upper() == "APPEND":
        if not table_exists:
            df.write.mode("overwrite").saveAsTable(schema+"."+tbl_name)
        else:
            df.write.mode(loadtype).saveAsTable(schema+"."+tbl_name)
            print("successfully loaded data into tables(APPEND mode) -- ", schema+"."+tbl_name)
            
    if loadtype.upper() == "INC":
        if not table_exists:
            print("table not exists")
            df.write.mode("overwrite").saveAsTable(schema+"."+tbl_name)
            print("successfully loaded data into tables(INC mode) -- ", schema+"."+tbl_name)
        else:
            #upsert
            print("upsert")
            delta_table = DeltaTable.forName(spark, schema+"."+tbl_name)
            cond = ""
            if PKcols != None and "," in PKcols:
                cols = PKcols.split(",")
                for i in cols:
                    cond = cond + " target."+i+" = source."+i
                    if i!=cols[-1]:
                        cond+=" AND "
            else:
                cond = cond + " target." + PKcols + " = source." + PKcols
            print(cond)
            delta_table.alias("target").merge(df.alias("source"),cond) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()

    if loadtype.upper() == "OVERWRITE":
        df.write.mode(loadttype).option("overwriteSchema", True).saveAsTable(schema+"."+tbl_name)


In [0]:
#create sales_customer
sales_customers = fetch_data("sales_customers")
create_tables(sales_customers,"bronze","sales_customers","append")

#create sales_franchises
sales_franchises = fetch_data("sales_franchises")
create_tables(sales_franchises,"bronze","sales_franchises","append")

#create sales_suppliers
sales_suppliers = fetch_data("sales_suppliers")
create_tables(sales_suppliers,"bronze","sales_suppliers","append")

#create sales_transactions
sales_transactions = fetch_data("sales_transactions")
create_tables(sales_transactions,"bronze","sales_transactions","append")

#create media_customer_reviews
media_customer_reviews = fetch_data("media_customer_reviews")
create_tables(media_customer_reviews,"bronze","media_customer_reviews","append")

#create media_gold_reviews_chunked
media_gold_reviews_chunked = fetch_data("media_gold_reviews_chunked")
create_tables(media_gold_reviews_chunked,"bronze","media_gold_reviews_chunked","append")

successfully loaded data into tables(APPEND mode) --  bronze.sales_customers
successfully loaded data into tables(APPEND mode) --  bronze.sales_franchises
successfully loaded data into tables(APPEND mode) --  bronze.sales_suppliers
successfully loaded data into tables(APPEND mode) --  bronze.sales_transactions
successfully loaded data into tables(APPEND mode) --  bronze.media_customer_reviews
successfully loaded data into tables(APPEND mode) --  bronze.media_gold_reviews_chunked


Create Silver Tables

In [0]:
loadtype = "INC"
schema = "silver"

primarykey_cols = "new_id"
tbl_name = "media_customer_reviews"
df = spark.read.table("bronze."+tbl_name).dropDuplicates()
create_tables(df,schema,tbl_name,loadtype,primarykey_cols)

primarykey_cols = "franchiseID,chunk_id,review_date"
tbl_name = "media_gold_reviews_chunked"
df = spark.read.table("bronze."+tbl_name).dropDuplicates()
create_tables(df,schema,tbl_name,loadtype,primarykey_cols)

primarykey_cols = "customerID"
tbl_name = "sales_customers"
df = spark.read.table("bronze."+tbl_name).dropDuplicates()
create_tables(df,schema,tbl_name,loadtype,primarykey_cols)

primarykey_cols = "franchiseID"
tbl_name = "sales_franchises"
df = spark.read.table("bronze."+tbl_name).dropDuplicates()
create_tables(df,schema,tbl_name,loadtype,primarykey_cols)

primarykey_cols = "supplierID"
tbl_name = "sales_suppliers"
df = spark.read.table("bronze."+tbl_name).dropDuplicates()
create_tables(df,schema,tbl_name,loadtype,primarykey_cols)

primarykey_cols = "transactionID"
tbl_name = "sales_transactions"
df = spark.read.table("bronze."+tbl_name).dropDuplicates()
create_tables(df,schema,tbl_name,loadtype,primarykey_cols)

upsert
 target.new_id = source.new_id
upsert
 target.franchiseID = source.franchiseID AND  target.chunk_id = source.chunk_id AND  target.review_date = source.review_date
upsert
 target.customerID = source.customerID
upsert
 target.franchiseID = source.franchiseID
upsert
 target.supplierID = source.supplierID
upsert
 target.transactionID = source.transactionID


create gold layer tables

In [0]:
sql_query = """
select product, count(transactionID) 
as no_products_sold 
from silver.sales_transactions group by product order by no_products_sold desc limit 1"""

df_most_sold_products = spark.sql(sql_query)

display(df_most_sold_products)


df_most_sold_products.write.mode("overwrite").saveAsTable("gold.most_sold_products")

product,no_products_sold
Golden Gate Ginger,586


In [0]:
sql_query_most_supplied_supplierID = """select s.supplierID,s.name, count(f.franchiseID) as count_franchises
from silver.sales_suppliers s 
left join silver.sales_franchises f 
on s.supplierID = f.supplierID
group by s.supplierID,s.name
order by count_franchises desc"""

df_most_supplied_supplierID = spark.sql(sql_query_most_supplied_supplierID)

display(df_most_supplied_supplierID)

df_most_supplied_supplierID.write.mode("overwrite").saveAsTable("gold.most_supplied_supplierID")

supplierID,name,count_franchises
4000009,Maple Monarch,1
4000026,Mace Meadows,1
4000004,Vanilla Valley,1
4000024,Anise Acres,1
4000023,Fennel Fields,1
4000021,Cocoa Crops,1
4000014,Molasses Mills,1
4000025,Nutmeg Nirvana,1
4000022,Poppy Peaks,1
4000001,Coconut Grove,1


In [0]:
sql_query_total_sales = """
select DATE_FORMAT(dateTime, "MM") as Month_data, sum(quantity) as total_quantity, sum(totalPrice) as total_sales_amount
from silver.sales_transactions
group by Month_data"""

df_total_sales = spark.sql(sql_query_total_sales)
display(df_total_sales)

df_total_sales.write.mode("overwrite").saveAsTable("gold.total_sales")

Month_data,total_quantity,total_sales_amount
5,22157,66471
