In [1]:
import os

# Path where files are uploaded
data_path = "/content/sample_data/data"

# Extract all .gz files
for file in os.listdir(data_path):
    if file.endswith(".gz"):
        os.system(f"gunzip {os.path.join(data_path, file)}")

print("Extraction complete!")


Extraction complete!


In [2]:
!pip install pyspark




In [3]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("ETL_Pipeline").getOrCreate()

print("Spark session initialized!")


Spark session initialized!


In [4]:
# Define data path
data_path = "/content/sample_data/data"

# Load transactions data
df_transactions = spark.read.option("delimiter", "|").csv(f"{data_path}/fact.transactions.dlm", header=True, inferSchema=True)

# Load product hierarchy data
df_products = spark.read.option("delimiter", "|").csv(f"{data_path}/hier.prod.dlm", header=True, inferSchema=True)

# Show first few rows
df_transactions.show(5)
df_products.show(5)


+---------+-------+----+-------------------+-----------+----------+---------+-----------------+-----------+-------------+----------------+-----------------+----------------+
| order_id|line_id|type|                 dt|pos_site_id|    sku_id|fscldt_id|price_substate_id|sales_units|sales_dollars|discount_dollars|original_order_id|original_line_id|
+---------+-------+----+-------------------+-----------+----------+---------+-----------------+-----------+-------------+----------------+-----------------+----------------+
|164087401|      2|Sale|2016-01-31 06:17:01|    CATMAIN|2668940801| 20160131|               FP|          1|        58.95|             0.0|             NULL|            NULL|
|164087409|      4|Sale|2016-01-31 06:17:25|    CATMAIN|2920920601| 20160131|               FP|          1|        49.95|             0.0|             NULL|            NULL|
|164087440|      2|Sale|2016-01-31 06:19:28|   INETMAIN|0695690000| 20160131|               FP|          2|         37.9|         

In [6]:
from pyspark.sql.functions import sum

# Join transactions with product hierarchy on 'sku_id' (ensure this column exists in both tables)
df_joined = df_transactions.join(df_products, "sku_id", "left")

# Use fscldt_id (fiscal date ID) instead of fsclwk_id
df_summary = df_joined.groupBy("pos_site_id", "sku_id", "fscldt_id").agg(
    sum("sales_units").alias("total_sales_units"),
    sum("sales_dollars").alias("total_sales_dollars"),
    sum("discount_dollars").alias("total_discount_dollars")
)

df_summary.show(10)



+-----------+----------+---------+-----------------+-------------------+----------------------+
|pos_site_id|    sku_id|fscldt_id|total_sales_units|total_sales_dollars|total_discount_dollars|
+-----------+----------+---------+-----------------+-------------------+----------------------+
|    CATMAIN|2598481801| 20160131|                1|              59.95|                   0.0|
|    CATMAIN|0695890000| 20160201|                5|             374.75|                   0.0|
|    CATMAIN|6831941800| 20160201|               10|              349.5|                   0.0|
|    CATMAIN|0174410000| 20160203|               14|             247.11|                 57.39|
|    CATMAIN|0487612000| 20160204|               23|             563.75|                 11.25|
|    CATMAIN|2666820701| 20160204|                1|              49.95|                   0.0|
|    CATMAIN|2870720701| 20160205|                7|             454.65|                   0.0|
|    CATMAIN|5844840913| 20160206|      

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, countDistinct

# Initialize Spark session
spark = SparkSession.builder.appName("RetailSalesProcessing").getOrCreate()

data_path = "/content/sample_data/data"

# Step 1: Extract .gz files
for file in os.listdir(data_path):
    if file.endswith(".gz"):
        os.system(f"gunzip {os.path.join(data_path, file)}")

print("Extraction complete!")

# Step 2: Load data
transactions_path = f"{data_path}/fact.transactions.dlm"
products_path = f"{data_path}/hier.prod.dlm"

df_transactions = spark.read.option("delimiter", "|").csv(transactions_path, header=True, inferSchema=True)
df_products = spark.read.option("delimiter", "|").csv(products_path, header=True, inferSchema=True)

# Step 3: Data validation (basic checks)
df_transactions = df_transactions.dropna(subset=["transaction_id", "sku_id", "pos_site_id", "fscldt_id"])
df_products = df_products.dropna(subset=["sku_id"])

# Check uniqueness of primary key
transaction_count = df_transactions.count()
unique_transaction_count = df_transactions.select(countDistinct("transaction_id")).collect()[0][0]
if transaction_count != unique_transaction_count:
    print("Warning: Duplicate transaction_id values detected!")

# Step 4: Foreign key constraint check (Ensure all transactions have matching product SKUs)
unmatched_skus = df_transactions.join(df_products, "sku_id", "left_anti")
if unmatched_skus.count() > 0:
    print("Warning: Some transactions have invalid SKU IDs!")

# Step 5: Join tables
print("Joining transactions with product hierarchy...")
df_joined = df_transactions.join(df_products, "sku_id", "left")

# Step 6: Aggregate sales data (Incremental Calculation)
existing_summary_path = f"{data_path}/mview_weekly_sales.csv"
try:
    df_existing = spark.read.csv(existing_summary_path, header=True, inferSchema=True)
    df_summary = df_joined.groupBy("pos_site_id", "sku_id", "fscldt_id").agg(
        sum("sales_units").alias("total_sales_units"),
        sum("sales_dollars").alias("total_sales_dollars"),
        sum("discount_dollars").alias("total_discount_dollars")
    )
    df_final = df_existing.unionByName(df_summary).groupBy("pos_site_id", "sku_id", "fscldt_id").agg(
        sum("total_sales_units").alias("total_sales_units"),
        sum("total_sales_dollars").alias("total_sales_dollars"),
        sum("total_discount_dollars").alias("total_discount_dollars")
    )
except Exception:
    print("No existing summary found. Creating new summary...")
    df_final = df_joined.groupBy("pos_site_id", "sku_id", "fscldt_id").agg(
        sum("sales_units").alias("total_sales_units"),
        sum("sales_dollars").alias("total_sales_dollars"),
        sum("discount_dollars").alias("total_discount_dollars")
    )

# Step 7: Save refined table
df_final.write.mode("overwrite").csv(existing_summary_path, header=True)
print("mview_weekly_sales updated successfully!")


Extraction complete!


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `transaction_id` cannot be resolved. Did you mean one of the following? [`order_id`, `line_id`, `type`, `dt`, `pos_site_id`, `sku_id`, `fscldt_id`, `price_substate_id`, `sales_units`, `sales_dollars`, `discount_dollars`, `original_order_id`, `original_line_id`].

In [3]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col

# Step 1: Initialize Spark Session
spark = SparkSession.builder.appName("RetailDataPipeline").getOrCreate()

# Define input and output paths
input_path = "/content/sample_data/data"
output_path = "/content/sample_data/output/mview_weekly_sales"

# Create output directory if it doesn't exist
os.makedirs(output_path, exist_ok=True)

# Step 2: Extract .gz files if present
for file in os.listdir(input_path):
    if file.endswith(".gz"):
        os.system(f"gunzip {os.path.join(input_path, file)}")

print("🔹 Extraction complete!")

# Step 3: Load Fact Transactions Data
df_transactions = spark.read.option("delimiter", "|").csv(
    f"{input_path}/fact.transactions.dlm", header=True, inferSchema=True
)

# Step 4: Load Product Hierarchy (Dimension) Data
df_products = spark.read.option("delimiter", "|").csv(
    f"{input_path}/hier.prod.dlm", header=True, inferSchema=True
)

# Step 5: Data Cleaning & Validation
df_transactions = df_transactions.dropna(
    subset=["order_id", "line_id", "sku_id", "pos_site_id", "fscldt_id"]
)
df_products = df_products.dropna(subset=["sku_id"])

# Check for valid Foreign Key Constraint (sku_id in both tables)
invalid_fk_count = df_transactions.join(df_products, "sku_id", "left_anti").count()
if invalid_fk_count > 0:
    print(f"⚠️ Warning: {invalid_fk_count} transactions have invalid sku_id!")

# Step 6: Join transactions with product hierarchy on 'sku_id'
df_joined = df_transactions.join(df_products, "sku_id", "left")

# Step 7: Aggregate Data by pos_site_id, sku_id, fscldt_id
df_summary = df_joined.groupBy("pos_site_id", "sku_id", "fscldt_id").agg(
    sum("sales_units").alias("total_sales_units"),
    sum("sales_dollars").alias("total_sales_dollars"),
    sum("discount_dollars").alias("total_discount_dollars")
)

# ✅ **Step 8: Handle Incremental Updates**
try:
    # Load existing mview_weekly_sales if it exists
    df_existing = spark.read.option("header", "true").csv(output_path)

    # Convert columns to correct types
    df_existing = df_existing.withColumn("total_sales_units", col("total_sales_units").cast("double"))
    df_existing = df_existing.withColumn("total_sales_dollars", col("total_sales_dollars").cast("double"))
    df_existing = df_existing.withColumn("total_discount_dollars", col("total_discount_dollars").cast("double"))

    # Merge new summary with existing data (incremental logic)
    df_final = df_existing.union(df_summary).groupBy("pos_site_id", "sku_id", "fscldt_id").agg(
        sum("total_sales_units").alias("total_sales_units"),
        sum("total_sales_dollars").alias("total_sales_dollars"),
        sum("total_discount_dollars").alias("total_discount_dollars")
    )

    print("✅ Incremental update applied!")

except Exception as e:
    # If no previous data exists, use the new summary directly
    print("⚠️ No existing mview_weekly_sales found. Creating new table.")
    df_final = df_summary

# Step 9: Save Final Data
df_final.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)

print(f"✅ Data successfully written to {output_path}")


🔹 Extraction complete!
✅ Incremental update applied!
✅ Data successfully written to /content/sample_data/output/mview_weekly_sales


In [5]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col

# Step 1: Initialize Spark Session
spark = SparkSession.builder.appName("RetailDataPipeline").getOrCreate()

# Enable checkpointing (Partial Checkpoint)
checkpoint_dir = "/content/sample_data/checkpoints"
spark.sparkContext.setCheckpointDir(checkpoint_dir)

# Define input and output paths
input_path = "/content/sample_data/data"
output_path = "/content/sample_data/output/mview_weekly_sales"

# Create output directory if it doesn't exist
os.makedirs(output_path, exist_ok=True)

# Step 2: Extract .gz files if present
for file in os.listdir(input_path):
    if file.endswith(".gz"):
        os.system(f"gunzip {os.path.join(input_path, file)}")

print("🔹 Extraction complete!")

# Step 3: Load Fact Transactions Data
df_transactions = spark.read.option("delimiter", "|").csv(
    f"{input_path}/fact.transactions.dlm", header=True, inferSchema=True
)

# Step 4: Load Product Hierarchy (Dimension) Data
df_products = spark.read.option("delimiter", "|").csv(
    f"{input_path}/hier.prod.dlm", header=True, inferSchema=True
)

# Step 5: Data Cleaning & Validation
df_transactions = df_transactions.dropna(
    subset=["order_id", "line_id", "sku_id", "pos_site_id", "fscldt_id"]
)
df_products = df_products.dropna(subset=["sku_id"])

# Check for valid Foreign Key Constraint (sku_id in both tables)
invalid_fk_count = df_transactions.join(df_products, "sku_id", "left_anti").count()
if invalid_fk_count > 0:
    print(f"⚠️ Warning: {invalid_fk_count} transactions have invalid sku_id!")

# Step 6: Join transactions with product hierarchy on 'sku_id'
df_joined = df_transactions.join(df_products, "sku_id", "left")

# ✅ **Apply Partial Checkpoint after Join (to optimize performance)**
df_joined = df_joined.checkpoint(eager=True)

# Step 7: Aggregate Data by pos_site_id, sku_id, fscldt_id
df_summary = df_joined.groupBy("pos_site_id", "sku_id", "fscldt_id").agg(
    sum("sales_units").alias("total_sales_units"),
    sum("sales_dollars").alias("total_sales_dollars"),
    sum("discount_dollars").alias("total_discount_dollars")
)

# ✅ **Apply Partial Checkpoint after Aggregation (to avoid recomputation)**
df_summary = df_summary.checkpoint(eager=True)

# ✅ **Step 8: Handle Incremental Updates with Deduplication**
try:
    # Load existing data if it exists
    df_existing = spark.read.option("header", "true").csv(output_path)

    # Convert columns to correct types
    df_existing = df_existing.withColumn("total_sales_units", col("total_sales_units").cast("double"))
    df_existing = df_existing.withColumn("total_sales_dollars", col("total_sales_dollars").cast("double"))
    df_existing = df_existing.withColumn("total_discount_dollars", col("total_discount_dollars").cast("double"))

    # Merge new summary with existing data, ensuring no duplicates
    df_final = df_existing.union(df_summary).groupBy("pos_site_id", "sku_id", "fscldt_id").agg(
        sum("total_sales_units").alias("total_sales_units"),
        sum("total_sales_dollars").alias("total_sales_dollars"),
        sum("total_discount_dollars").alias("total_discount_dollars")
    )

    # Remove exact duplicates
    df_final = df_final.dropDuplicates(["pos_site_id", "sku_id", "fscldt_id"])

    print("✅ Incremental update applied!")

except Exception as e:
    # If no previous data exists, use the new summary directly
    print("⚠️ No existing mview_weekly_sales found. Creating new table.")
    df_final = df_summary

# ✅ **Apply Partial Checkpoint before Writing (Final Optimization)**
df_final = df_final.checkpoint(eager=True)

# Step 9: Save Final Data in Append Mode
df_final.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)

print(f"✅ Data successfully written to {output_path}")


🔹 Extraction complete!
✅ Incremental update applied!
✅ Data successfully written to /content/sample_data/output/mview_weekly_sales


In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
import os

# Step 1: Initialize Spark Session
spark = SparkSession.builder.appName("RetailDataProcessor").getOrCreate()

# Define input and output paths
input_dir = "/content/sample_data/data"
output_dir = "/content/sample_data/output/mview_weekly_sales"

# Step 2: Extract .gz files
for filename in os.listdir(input_dir):
    if filename.endswith(".gz"):
        os.system(f"gunzip {os.path.join(input_dir, filename)}")

# Step 3: Load data into Spark DataFrames
transactions_df = spark.read.option("delimiter", "|").csv(
    f"{input_dir}/fact.transactions.dlm", header=True, inferSchema=True
)

products_df = spark.read.option("delimiter", "|").csv(
    f"{input_dir}/hier.prod.dlm", header=True, inferSchema=True
)

# Step 4: Clean Data - Remove Null Values
transactions_df = transactions_df.dropna(subset=["order_id", "line_id", "sku_id", "pos_site_id", "fscldt_id"])
products_df = products_df.dropna(subset=["sku_id"])

# Step 5: Foreign Key Validation (Ensuring sku_id exists in both tables)
missing_sku_count = transactions_df.join(products_df, "sku_id", "left_anti").count()
if missing_sku_count > 0:
    print(f"⚠️ Warning: {missing_sku_count} records have invalid SKU IDs!")

# Step 6: Perform Join to enrich transaction data
merged_df = transactions_df.join(products_df, "sku_id", "left")

# Apply Partial Checkpointing (To Optimize Spark Processing)
merged_df = merged_df.checkpoint(eager=True)

# Step 7: Aggregate Sales Data
summary_df = merged_df.groupBy("pos_site_id", "sku_id", "fscldt_id").agg(
    sum("sales_units").alias("total_units"),
    sum("sales_dollars").alias("total_revenue"),
    sum("discount_dollars").alias("total_discounts")
)

# Apply Checkpointing for Intermediate Data
summary_df = summary_df.checkpoint(eager=True)

# Step 8: Handle Incremental Updates
try:
    existing_df = spark.read.option("header", "true").csv(output_dir)

    # Convert columns to appropriate types for aggregation
    existing_df = existing_df.withColumn("total_units", col("total_units").cast("double"))
    existing_df = existing_df.withColumn("total_revenue", col("total_revenue").cast("double"))
    existing_df = existing_df.withColumn("total_discounts", col("total_discounts").cast("double"))

    # Merge new and existing data
    final_df = existing_df.union(summary_df).groupBy("pos_site_id", "sku_id", "fscldt_id").agg(
        sum("total_units").alias("total_units"),
        sum("total_revenue").alias("total_revenue"),
        sum("total_discounts").alias("total_discounts")
    )

    # Remove duplicate records
    final_df = final_df.dropDuplicates(["pos_site_id", "sku_id", "fscldt_id"])
    print("✅ Incremental Update Applied Successfully!")

except:
    print("⚠️ No Existing Data Found. Creating Fresh Dataset.")
    final_df = summary_df

# Step 9: Save Final Output
final_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_dir)

print(f"✅ Data processing complete! Output saved at: {output_dir}")


⚠️ No Existing Data Found. Creating Fresh Dataset.
✅ Data processing complete! Output saved at: /content/sample_data/output/mview_weekly_sales
