In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### **Data Reading**

### Read Calendar Data

In [0]:
df_cal = spark.read.format("csv")\
    .option('header',True)\
    .option('inferSchema',True)\
    .load('abfss://bronze@advsalesworks.dfs.core.windows.net/AdventureWorks_Calendar')

In [0]:
df_cal.display()

### Transformation

In [0]:
df_cal = df_cal.withColumn('Month',month(col('Date')))\
                .withColumn('Year',year(col('Date')))

df_cal.display()

In [0]:
df_cal.write.format('parquet')\
    .mode('append')\
    .option('path','abfss://silver@advsalesworks.dfs.core.windows.net/AdventureWorks_Calendar')\
    .save()

### Cutomers

In [0]:
df_cus = spark.read.format("csv")\
    .option('header',True)\
    .option('inferSchema',True)\
    .load('abfss://bronze@advsalesworks.dfs.core.windows.net/AdventureWorks_Customers')

In [0]:
df_cus.display()

### Transformation

In [0]:
df_cus = df_cus.withColumn('FullName',concat(col('Prefix'),lit('.'),col('FirstName'),lit(' '),col('LastName')))\
               .withColumn('BirthDate',to_date(col('BirthDate'),'dd/mm/yyyy'))\
                .withColumn('Gender_Count',when(col('Gender')=='M',1).otherwise(0))\
                .withColumn('MaritalStatus_Count',when(col('MaritalStatus')=='S',1).otherwise(0))\
                .withColumn('HomeOwner_Count',when(col('HomeOwner')=='Y',1).otherwise(0))
df_cus.display()

In [0]:
df_cus.write.format('parquet')\
      .mode('append')\
      .option('path','abfss://silver@advsalesworks.dfs.core.windows.net/AdventureWorks_Customers')\
      .save()

### Product_Categories

In [0]:
df_pcat = spark.read.format("csv")\
        .option('header',True)\
        .option('inferSchema',True)\
        .load('abfss://bronze@advsalesworks.dfs.core.windows.net/AdventureWorks_Product_Categories')

In [0]:
df_pcat.display()

In [0]:
df_pcat.write.format('parquet')\
    .mode('append')\
    .option('path','abfss://silver@advsalesworks.dfs.core.windows.net/AdventureWorks_Product_Categories')\
    .save()

### Subcategories

In [0]:
df_sub = spark.read.format("csv")\
    .option('header',True)\
    .option('inferSchema',True)\
    .load('abfss://bronze@advsalesworks.dfs.core.windows.net/Product_Subcategories')

In [0]:
df_sub.display()

In [0]:
df_sub.write.format('parquet')\
    .mode('append')\
    .option('path','abfss://silver@advsalesworks.dfs.core.windows.net/Product_Subcategories')\
    .save()

### Products

In [0]:
df_prod = spark.read.format("csv")\
        .option('header',True)\
        .option('inferSchema',True)\
        .load('abfss://bronze@advsalesworks.dfs.core.windows.net/AdventureWorks_Products')

In [0]:
df_prod.display()


In [0]:
df_prod = df_prod.withColumn('ProductSKU',split(col('ProductSKU'),'-')[0])\
            .withColumn('ProductName',split(col('ProductName'),' ')[0])
df_prod.display()

In [0]:
df_prod.write.format('parquet')\
    .mode('append')\
    .option('path','abfss://silver@advsalesworks.dfs.core.windows.net/AdventureWorks_Products')\
    .save()

### Returns

In [0]:
df_ret = spark.read.format("csv")\
        .option('header',True)\
        .option('inferSchema',True)\
        .load('abfss://bronze@advsalesworks.dfs.core.windows.net/AdventureWorks_Returns')

In [0]:
df_ret.display()

In [0]:
df_ret.write.format('parquet')\
    .mode('append')\
    .option('path','abfss://silver@advsalesworks.dfs.core.windows.net/AdventureWorks_Returns')\
    .save()

### Territories

In [0]:
df_ter = spark.read.format("csv")\
    .option('header',True)\
    .option('inferSchema',True)\
    .load('abfss://bronze@advsalesworks.dfs.core.windows.net/AdventureWorks_Territories')

In [0]:
df_ter.display()

In [0]:
df_ter.write.format('parquet')\
    .mode('append')\
    .option('path','abfss://silver@advsalesworks.dfs.core.windows.net/AdventureWorks_Territories')\
    .save()

### Sales

In [0]:
df_sales = spark.read.format("csv")\
    .option('header',True)\
    .option('inferSchema',True)\
    .load('abfss://bronze@advsalesworks.dfs.core.windows.net/AdventureWorks_Sales*')

In [0]:
df_sales.display()

### Transformation

In [0]:
df_sales = df_sales.withColumn('StockDate',to_timestamp('StockDate'))\
          .withColumn('OrderNumber',regexp_replace(col('OrderNumber'),'S','T'))\
            .withColumn('multiply',col('OrderLineItem')*col('OrderQuantity'))
df_sales.display()

In [0]:
df_sales.write.format('parquet')\
    .mode('append')\
    .option('path','abfss://silver@advsalesworks.dfs.core.windows.net/AdventureWorks_Sales')\
    .save()

### Sales Analysis

In [0]:
df_sales.groupBy('OrderDate').agg(count('OrderNumber')).alias('total_orders').display()

Databricks visualization. Run in Databricks to view.

In [0]:
df_pcat.display()

Databricks visualization. Run in Databricks to view.

In [0]:
df_sub.display()

Databricks visualization. Run in Databricks to view.

In [0]:
df_ter.display()

Databricks visualization. Run in Databricks to view.

In [0]:
df_ret.display()

Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import col, lag, when, lit, min as spark_min, max as spark_max, countDistinct
from pyspark.sql.window import Window

# Assume df_sales and df_cus are loaded DataFrames
# df_sales: must have columns ['CustomerID', 'OrderDate', ...]
# df_cus: must have columns ['CustomerID', ...]

# Step 1: Prepare sales SCD2 table
window_spec = Window.partitionBy('CustomerID').orderBy('OrderDate')

df_sales_scd2 = df_sales.withColumn('__START_AT', col('OrderDate')) \
    .withColumn('__END_AT', lag('OrderDate', -1).over(window_spec)) \
    .withColumn('__END_AT', when(col('__END_AT').isNull(), lit(None)).otherwise(col('__END_AT')))

# Step 2: Aggregate sales to get repeat customer flag
df_customer_orders = df_sales.groupBy('CustomerKey') \
    .agg(countDistinct('OrderDate').alias('order_count'),
         spark_min('OrderDate').alias('first_order'),
         spark_max('OrderDate').alias('last_order'))

df_customer_orders = df_customer_orders.withColumn(
    'is_repeating_customer', when(col('order_count') > 1, lit(1)).otherwise(lit(0))
)

# Step 3: Join with customers and add flags
df_customers_flagged = df_cus.join(df_customer_orders, on='CustomerKey', how='left')\
    .fillna({'order_count': 0, 'is_repeating_customer': 0})

display(df_customers_flagged)

Databricks visualization. Run in Databricks to view.