In [None]:
import findspark 
findspark.init()

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
import delta as D

from pathlib import Path

#initialize spark instance with delta extension
builder = (SparkSession.builder.appName("pyspark-notebook")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            )
spark = D.configure_spark_with_delta_pip(builder).getOrCreate()

#staging folders
landingStage = './data/00-landing'
bronzeStage  = './data/01-bronze'
silverStage  = './data/02-silver'
goldStage    = './data/03-gold'


In [None]:
%%bash
#create staging folder and dummy files (will overwrite file if exists)
python3 ./init/create_dummy_files.py

In [None]:
#landing to bronze

#data schema
data_schema = {
        'ProductId':'integer'
        ,'ProductNumber':'string'
        ,'ProductName':'string'
        ,'ModelName':'string'
        ,'StandardCost':'integer'
        ,'ListPrice':'integer'
        ,'Timestamp':'timestamp'
        }
data_schema = [F.col(column_name).cast(data_type) for column_name,data_type in data_schema.items()]

for csv_file in Path(landingStage).glob('*products.csv'):
    df = spark.read.format("csv").load(path=str(csv_file),header=True)

    #casting and filtering
    df = df.select(data_schema).dropDuplicates(["ProductID"])
        
    #export as parquet
    bronze_file = Path(bronzeStage) / f'{Path(csv_file).stem}.parquet'
    df.write.format("parquet").mode('overwrite').save(str(bronze_file))
    
    print(f"Filename: {bronze_file.name} nr. unique products: {df.count()}")


In [None]:
#create empty unmanaged table in silver folder
table_name = 'Products'
silver_table = str((Path(silverStage) / 'products-delta').absolute())
(D.DeltaTable.createIfNotExists(spark)
    .tableName(table_name)
    .addColumn('ProductID','INT')
    .addColumn('ProductNumber','STRING')
    .addColumn('ProductName','STRING')
    .addColumn('ModelName','STRING')
    .addColumn('StandardCost','INT')
    .addColumn('ListPrice','INT')
    .addColumn('Timestamp','TIMESTAMP')
    .addColumn('ProfitMargin','DOUBLE')
    .addColumn('ModelRank','INTEGER')
    .location(silver_table)
    .partitionedBy('ModelName')
    .execute()
    )

#read delta table as instance
dt = D.DeltaTable.forPath(spark, silver_table)


In [None]:
#bronze to silver
for bronze_file in Path(bronzeStage).glob('*.parquet'):
    df = spark.read.format("parquet").load(path=str(bronze_file))
    
    #calculate marging
    margin = F.round(100-((F.col('StandardCost')/F.col('ListPrice'))*100),2)
    df = df.withColumn('ProfitMargin',margin)
    
    #rank most profitable models 
    first_tier_models = (df.filter( (F.col('ProfitMargin') >50) & (F.col('StandardCost')>100) )
                        .select('ModelName').distinct().toPandas().values.flatten().tolist())
    second_tier_models = (df.filter( ( F.col('ProfitMargin').between(40,50) ) & (F.col('StandardCost')>100) )
                        .select('ModelName').distinct().toPandas().values.flatten().tolist())
    third_tier_models = (df.filter( ( F.col('ProfitMargin')<40 ) & (F.col('StandardCost')>100) )
                        .select('ModelName').distinct().toPandas().values.flatten().tolist())
    all_tier_models = (F.when(F.col('ModelName').isin(first_tier_models),1)
                        .when(F.col('ModelName').isin(second_tier_models),2)
                        .when(F.col('ModelName').isin(third_tier_models),3)
                        .otherwise(4))

    df = df.withColumn('ModelRank',all_tier_models)
    
    #upsert data to delta table
    (dt.alias('target')
        .merge(source=df.alias('source'),condition='target.ProductID = source.ProductID')
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute())
    
print(f"Tablename: {silver_table} total nr. unique products: {dt.toDF().count()}")
    

In [None]:
#silver to golden
dt = spark.read.format("delta").load(silver_table)
dt_sale = dt.select('ProductID','StandardCost','ListPrice','ProfitMargin')
dt_products = dt.select('ProductID','ProductNumber','ProductName','ModelName','ModelRank')

golden_sale_table = str((Path(goldStage) / 'sale-delta').absolute())
golden_products_table = str((Path(goldStage) / 'products-delta').absolute())
dt_sale.write.format('delta').mode('overwrite').option('path',golden_sale_table).saveAsTable('Sales_gold')
dt_products.write.format('delta').mode('overwrite').option('path',golden_products_table).saveAsTable('Products_gold')