# Test CSV file read from ADLS

In [0]:
df = spark.read.format('csv').options(header=True, inferSchema=True).load('abfss://converted-table-to-csv-files@blobstorageforcsvfiles.dfs.core.windows.net/dbo_dim_brand_snowflake.csv')


df.display()



# Read only wanted CSVs using for loop

In [0]:
from pyspark.sql.functions import input_file_name
files = dbutils.fs.ls("abfss://converted-table-to-csv-files@blobstorageforcsvfiles.dfs.core.windows.net/")
final_df= None
allowed_files = {
    "dbo_dim_customer.csv",
    "dbo_dim_date.csv",
    "dbo_dim_product.csv",
    "dbo_dim_store.csv",
    "dbo_fact_sales.csv"
}
for file in files:
    if file.name in allowed_files:
        print(f'Reading:{file.name}')
        df = spark.read.format('csv').options(header = True, inferSchema = True).load(file.path)
        df.printSchema()

  


# Read Raw CSVs from ADLS

In [0]:
file_path = "abfss://converted-table-to-csv-files@blobstorageforcsvfiles.dfs.core.windows.net/"

def read_csv(file_name):
    return spark.read.format('csv').options(header = True, inferSchema = True).load(f"{file_path}{file_name}.csv")

df_customer = read_csv("dbo_dim_customer")
df_product = read_csv("dbo_dim_product")
df_store   = read_csv("dbo_dim_store")
df_date    = read_csv("dbo_dim_date")
df_fact    = read_csv("dbo_fact_sales")
df_staging = read_csv("dbo_staging_fact_sales")

# Cleaned data and validated by dropping null values and duplicates

In [0]:


#In PySpark, the dropna() function, available on a DataFrame, is used to remove rows containing null or NaN (Not a Number) values.

from pyspark.sql.functions import col, trim, lower

# Clean df_customer
df_customer_clean = (
    df_customer
    .dropna(subset=['customer_id'])
    .dropDuplicates(['customer_id'])
    .withColumn('first_name', lower(trim(col('first_name'))))
    .withColumn('last_name', lower(trim(col('last_name'))))
    .withColumn('email', trim(col('email')))
)

# Clean df_product
df_product_clean = (
    df_product
    .dropna(subset=['product_id'])
    .dropDuplicates(['product_id'])
    .withColumn('product_name', lower(trim(col('product_name'))))
    .withColumn('brand', trim(col('brand')))
)

# Clean df_date
df_date_clean = (
    df_date
    .dropna(subset=['date_id'])
    .dropDuplicates(['date_id'])
)

# Clean df_store
df_store_clean = (
    df_store
    .dropna(subset=['store_id'])
    .dropDuplicates(['store_id'])
    .withColumn('store_name', trim(col('store_name')))
    .withColumn('city', trim(col('city')))
    .withColumn('region', trim(col('region')))
    .withColumn('store_manager', trim(col('store_manager')))
)

# Clean df_fact
df_fact_sales_clean = (
    df_fact.dropna(subset=['sales_id'])
           .dropDuplicates(['sales_id'])
)

df_fact_sales_clean.display()




# Normalize to Snowflake Schema

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, dense_rank
# from pyspark.sql.functions import monotonically_increasing_id

#monotonically_increasing_id() is a function available in Apache Spark's PySpark library (and other Spark APIs like Scala, Java, and R) that generates unique, monotonically increasing 64-bit integer IDs for rows within a DataFrame.

# For Customer Table
window_spec_loyalty = Window.orderBy('loyalty_status')

df_loyalty_snowflake = df_customer_clean.select('loyalty_status').distinct().withColumn('loyaltyTier_id', dense_rank().over(window_spec_loyalty))

df_loyalty_snowflake.display()


df_customer_final_snowflake = df_customer_clean.join(df_loyalty_snowflake, on='loyalty_status', how='left').drop('loyalty_status')

df_customer_final_snowflake.display()


#For Product Table
window_spec_brand = Window.orderBy('brand')
df_brand_snowflake = df_product_clean.select('brand').distinct().withColumn('brand_id', dense_rank().over(window_spec_brand) )


window_spec_category = Window.orderBy('category')
df_category_snowflake = df_product_clean.select('category').distinct().withColumn('category_id', dense_rank().over(window_spec_category) )

df_brand_snowflake.display()
df_category_snowflake.display()
df_product_final_snowflake = df_product_clean.join(df_brand_snowflake, on='brand', how='left').drop('brand').join(df_category_snowflake, on='category', how='left').drop('category')

df_product_final_snowflake.display()






# Apply Complex transformation

#### 1. Enrich Product tier based on price

In [0]:
from pyspark.sql.functions import when
df_price = df_product_final_snowflake.withColumn('price-tier', when(col('price')>100, 'Premium').when(col('price')>50,'Standard').otherwise('Basic'))

df_price.display()

#### 2. Filter future dates in facts table

In [0]:
from pyspark.sql.functions import current_date


df_staging_filtered = df_staging.filter(col("last_updated") == '2025-07-21T00:00:00.000+00:00')
df_staging_filtered.display()



In [0]:
from pyspark.sql.functions import sum, count
df_loyalty_score = df_customer_final_snowflake.groupby('customer_id').agg(count('loyaltyTier_id').alias('loyalty_score'))
df_loyalty_score.display()

In [0]:
from pyspark.sql.functions import sum, count
df_fact_sales_clean.display()

df_agg_fact = df_fact_sales_clean.groupBy('store_id').agg(count('product_id').alias('product_sold'), sum('quantity_sold').alias('total_quantity_sold'), sum('total_amount').alias('total_amount_sold'))

df_agg_fact.display()


# Write to Snowflake Staging tables

# Snowflake Configuration

In [0]:
# Snowflake Config
sfOptions = {
  "sfURL": "https://fuskshz-jo76028.snowflakecomputing.com",
  "sfUser": "adminuser",
  "sfPassword": "Databasepass11!",
  "sfDatabase": "sql_data_migration_snowflake",
  "sfSchema": "data_migration_schema",
  "sfWarehouse": "COMPUTE_WH",
  "sfRole": "accountadmin"
}

In [0]:
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"


#### Since the tables are not writable to snowflake because it is using serverless cluster so it is not allowing the tables to write to snowflake

#### Another option is push the transformaed and normalized data to ADLS and push to Snowflake through ADF pipeline

# Write the normalized and transformed data to ADLS

# Normalized Data

In [0]:
from datetime import datetime

base_path = "abfss://normalized-tables@blobstorageforcsvfiles.dfs.core.windows.net/"



def save_to_adls(df, table_name):
    
# Generate timestamp string
 timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
#  filename = f"sales_{timestamp}.csv"

# Define ADLS path
 output_dir = f"{base_path}tables"
 final_path = f"{base_path}snowflake-tables/{table_name}_{timestamp}.csv"

#Delete older files for the same table_name from target directory
 target_dir = f"{base_path}snowflake-tables"
 existing_files =dbutils.fs.ls(target_dir)
 for f in existing_files:
    if f.name.startswith(f"{table_name}_") and f.name.endswith('.csv'):
        dbutils.fs.rm(f.path, True)
        print(f"Deleted old file: {f.name}")


# Write to temp directory
 df.coalesce(2).write.option("header", True).mode("overwrite").csv(output_dir)

# List files in temp directory to find the actual part file name
 files = dbutils.fs.ls(output_dir)
 part_file = [f.path for f in files if f.name.startswith("part-")][0]

# Copy to final path with parameterized filename
 dbutils.fs.cp(part_file, final_path)

# Delete the temporary folder
 dbutils.fs.rm(output_dir, True)
 print(f"✅ Saved new version: {table_name} → {final_path}")


    
    # df.coalesce(1).write.mode("overwrite").format("csv").options(header="true", inferSchema="true").save(path)
# print(f"✅ Saved: {table_name} → {path}")

In [0]:
save_to_adls(df_customer_final_snowflake,"dbo_dim_customer")
save_to_adls(df_loyalty_snowflake,"dbo_dim_loyalty")
save_to_adls(df_brand_snowflake,"dbo_dim_brand")
save_to_adls(df_category_snowflake,"dbo_dim_category")
save_to_adls(df_product_final_snowflake,"dbo_dim_product")
save_to_adls(df_date_clean,"dbo_dim_date")
save_to_adls(df_store_clean,"dbo_dim_store")
save_to_adls(df_fact_sales_clean,"dbo_dim_fact_sales")
save_to_adls(df_staging,"dbo_dim_staging_fact_sales_incremental_loads")

# Transformed Data

In [0]:
from datetime import datetime

base_path = "abfss://normalized-tables@blobstorageforcsvfiles.dfs.core.windows.net/"



def save_transformed_data_to_adls(df, table_name):
    
# Generate timestamp string
 timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
#  filename = f"sales_{timestamp}.csv"

# Define ADLS path
 output_dir = f"{base_path}tables"
 final_path = f"{base_path}transformed-snowflake-tables/{table_name}_{timestamp}.csv"

 #Delete older files for the same table_name from target directory
 target_dir = f"{base_path}transformed-snowflake-tables"
 existing_files =dbutils.fs.ls(target_dir)
 for f in existing_files:
    if f.name.startswith(f"{table_name}_") and f.name.endswith('.csv'):
        dbutils.fs.rm(f.path, True)
        print(f"Deleted old file: {f.name}")

# Write to temp directory
 df.coalesce(2).write.option("header", True).mode("overwrite").csv(output_dir)

# List files in temp directory to find the actual part file name
 files = dbutils.fs.ls(output_dir)

 part_file = [f.path for f in files if f.name.startswith("part-")][0]
 

# Copy to final path with parameterized filename
 dbutils.fs.cp(part_file, final_path)

# Delete the temporary folder
 dbutils.fs.rm(output_dir, True)
 print(f"✅ Saved new version: {table_name} → {final_path}")
    
    # df.coalesce(1).write.mode("overwrite").format("csv").options(header="true", inferSchema="true").save(path)
# print(f"✅ Saved: {table_name} → {path}")

In [0]:
save_transformed_data_to_adls(df_price, "dbo_dim_price_tier")
save_transformed_data_to_adls(df_staging_filtered, "dbo_dim_filtered_staging")
save_transformed_data_to_adls(df_loyalty_score, "dbo_dim_loyalty_score")
save_transformed_data_to_adls(df_agg_fact, "dbo_dim_agg_fact")
