In [0]:
print("Enterprise ETL project started")

In [0]:
raw_path = "/Volumes/workspace/default/etl_sales/raw/sales_day1.csv"

In [0]:
#df_raw = spark.read.option("header", True).option("infraschema", True).csv(raw_path)

In [0]:
from pyspark.sql.types import *

bronze_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("city", StringType(), True),
])

df_raw = (
    spark.read
    .schema(bronze_schema)
    .option("header", True)
    .csv(raw_path)
)


In [0]:
df_raw.show()

In [0]:
df_raw.printSchema()

Add ingestion metadata

In [0]:
from pyspark.sql.functions import current_timestamp, lit

df_bronze = (
    df_raw
    .withColumn("ingestion_timestamp", current_timestamp())
    .withColumn("source_file", lit("sales_day1.csv"))
)


Write Bronze data (Delta, production-style)

In [0]:
bronze_path = "/Volumes/workspace/default/etl_sales/bronze/sales"

(df_bronze
 .write
 .format("delta")
 .mode("append")
 .save(bronze_path))

Validate Bronze output

In [0]:
spark.read.format("delta").load(bronze_path).show()

#Day 2 ingestion 

In [0]:
from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, StringType, DoubleType, DateType
)

bronze_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("city", StringType(), True)
])

In [0]:
from pyspark.sql.functions import current_timestamp, lit
raw_day2 = "/Volumes/workspace/default/etl_sales/raw/sales_day2.csv"
df_day2 = (
    spark.read
    .schema(bronze_schema)
    .option("header", True)
    .csv(raw_day2)
    .withColumn("ingestion_timestamp", current_timestamp())
    .withColumn("source_file", lit("sales_day2.csv"))
)


In [0]:
bronze_path = "/Volumes/workspace/default/etl_sales/bronze/sales"

(df_day2
 .write
 .format("delta")
 .mode("append")
 .save(bronze_path)
 )

In [0]:
bronze_path = "/Volumes/workspace/default/etl_sales/bronze/sales"

spark.read.format("delta").load(bronze_path).printSchema()


In [0]:
spark.read.format("delta").load(bronze_path) \
    .groupBy("order_id") \
    .count() \
    .filter("count > 1") \
    .show()
