In [10]:
# Move data from Bronze to Silver
from datetime import date
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, date_format, expr, concat, lit, year, month, dayofmonth, when
from pyspark.sql import SparkSession

# Set the current database to the Silver Lakehouse
spark.catalog.setCurrentDatabase("Silver")

# Load the Delta table directly by its name
df_CarPrices = spark.read.format("delta").table("CarPrices")

#now setup current lakehouse to gold
spark.catalog.setCurrentDatabase("Gold")

StatementMeta(, a6bdc6e9-cf96-450b-aa53-ff918327e341, 12, Finished, Available, Finished)

In [11]:
#create model dimension
df_Model = df_CarPrices.select('Make', 'Model', 'Trim', 'Body').dropDuplicates()
window = Window.orderBy("Make")
df_Model = df_Model.withColumn("ModelId", row_number().over(window))
df_Model.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("Model")


StatementMeta(, a6bdc6e9-cf96-450b-aa53-ff918327e341, 13, Finished, Available, Finished)

In [12]:
#create Seller dimension
df_Seller = ( 
    df_CarPrices.select('Seller')
    .dropDuplicates()
    .withColumn("SellerId", row_number().over(Window.orderBy("Seller")))
)

df_Seller.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("Seller")

StatementMeta(, a6bdc6e9-cf96-450b-aa53-ff918327e341, 14, Finished, Available, Finished)

In [13]:
#create Color
df_Color = (
    df_CarPrices.select('BodyColor', 'InteriorColor') 
    .dropDuplicates() 
    .withColumn("ColorId", row_number().over(Window.orderBy('BodyColor')))
)

df_Color.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("Color")

StatementMeta(, a6bdc6e9-cf96-450b-aa53-ff918327e341, 15, Finished, Available, Finished)

In [14]:
#create TransmissionType
df_TransmissionType = (
    df_CarPrices.select('Transmission')
    .dropDuplicates()
    .withColumn('TransmissionTypeId', row_number().over(Window.orderBy('Transmission')))
)
df_TransmissionType.write.format('delta').mode("overwrite").option("overwriteSchema", "true").saveAsTable("TransmissionType")

StatementMeta(, a6bdc6e9-cf96-450b-aa53-ff918327e341, 16, Finished, Available, Finished)

In [15]:
# Define start and end dates
start_date = date(2010, 1, 1)
end_date = date(2023, 12, 31)

# Calculate the number of days between start and end dates
num_days = (end_date - start_date).days

# Create a DataFrame with the date range
df_date_dim = (
    spark.range(num_days)
    .withColumn("Id", F.col("Id").cast(IntegerType()))  # 'id' is lowercase in newer versions
    .withColumn("FullDate", F.expr(f"date_add('{start_date}', Id)"))
    .withColumn("YYYYMMDD", F.date_format(F.col("FullDate"), "yyyyMMdd").cast("int"))
    .withColumn("Year", F.year(F.col("FullDate")))
    .withColumn("Month", F.month(F.col("FullDate")))
    .withColumn("Day", F.dayofmonth(F.col("FullDate")))
    .withColumn("MonthName", F.date_format(F.col("FullDate"), "MMMM"))
    .withColumn("DayOfWeek", F.dayofweek(F.col("FullDate")))
    .withColumn("DayOfWeekName", F.date_format(F.col("FullDate"), "EEEE"))
    .withColumn("WeekOfYear", F.weekofyear(F.col("FullDate")))
    .withColumn("Quarter", F.quarter(F.col("FullDate")))
    .withColumn("YearQuarter", F.concat(F.col("Year"), F.lit('Q'), F.col("Quarter")))
)

#save table
df_date_dim.write.mode("overwrite").saveAsTable("Gold.DateDimension")





StatementMeta(, a6bdc6e9-cf96-450b-aa53-ff918327e341, 17, Finished, Available, Finished)

In [23]:
#create Fact table
# Perform all joins in a single chain

df_Car_Fact = (
    df_CarPrices
    .join(df_Model, (df_CarPrices['Make'] == df_Model['Make']) & (df_CarPrices['Model'] == df_Model['Model'])  & 
            (df_CarPrices['Trim'] == df_Model['Trim'])  & (df_CarPrices['Body'] == df_Model['Body']), 'inner')
    .join(df_TransmissionType, (df_CarPrices['Transmission'] == df_TransmissionType['Transmission']), 'inner')
    .join(df_Color, (df_CarPrices['BodyColor'] == df_Color['BodyColor']) & (df_CarPrices['InteriorColor'] == df_Color['InteriorColor']), 'inner')
    .join(df_Seller, (df_CarPrices['Seller'] == df_Seller['Seller']), 'inner')
    .join(df_date_dim, (df_CarPrices['SalesDate'] == df_date_dim['YYYYMMDD']), 'inner')
    .withColumn('SoldOverMarketValue', when( F.col('SellingPrice') - F.col('ManheirMarketReportValue') > 0, 1) .otherwise(0) ) 
    .withColumn('AbsoluteDifference', F.col('SellingPrice') - F.col('ManheirMarketReportValue'))
    .withColumn('PercentDifference', 
        F.round( (F.col('SellingPrice') - F.col('ManheirMarketReportValue')) / F.col('ManheirMarketReportValue') * 100, 1)
    )
    .select('ModelYear', 'ModelId', 'TransmissionTypeId', 'ColorId', 'SellerID', 'Vin', 'SalesState', 'Condition', 
        'Miles', 'ManheirMarketReportValue', 'SellingPrice', 'SoldOverMarketValue', 'AbsoluteDifference', 'PercentDifference', 'SalesDate'
    )
)

# Display the resulting DataFrame
df_Car_Fact.write.mode("overwrite").saveAsTable("Gold.CarSales")

StatementMeta(, a6bdc6e9-cf96-450b-aa53-ff918327e341, 32, Finished, Available, Finished)