In [None]:
from pyspark.sql.types import StructType , StructField, StringType, IntegerType, DataType , FloatType , TimestampType
import pyspark.sql.functions as F

In [None]:
catalog_name = 'ecommerce'

#define schema for the raw data file for brand
brand_schema = StructType([
    StructField('brand_code', StringType(), False),
    StructField('brand_name', StringType(), True),
    StructField('category_code', StringType(), True),
    ])

raw_data_source = '/Volumes/ecommerce/source_data/raw/brands/*.csv'

df = spark.read.option("header","True").option("delimiter",",").schema(brand_schema).csv(raw_data_source)

#add metadata columns
df = df.withColumn("_source_file",F.col("_metadata.file_path")).withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))


#writing dataframe to delta table
df.write.format("delta") \
    .mode("overwrite") \
        .option("mergeSchema", "true") \
            .saveAsTable(f"{catalog_name}.bronze.brz_brands")

In [None]:
#define schema for the raw data file for category
category_schema = StructType([
    StructField('category_code', StringType(), False),
    StructField('category_name', StringType(), True),
    ])
raw_data_source = '/Volumes/ecommerce/source_data/raw/category/*.csv'

df = spark.read.option("header","True").option("delimiter",",").schema(category_schema).csv(raw_data_source)

#add metadata columns
df = df.withColumn("_source_file",F.col("_metadata.file_path")).withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))
df.write.format("delta") \
    .mode("overwrite") \
        .option("mergeSchema", "true") \
            .saveAsTable(f"{catalog_name}.bronze.brz_category")

In [None]:
#define schema for the raw data file for customers
customers_schema = StructType([
    StructField('customer_id', StringType(), False),
    StructField('phone', FloatType(), True),
    StructField('country_code', StringType(), True),
    StructField('country', StringType(), True),
    StructField('state', StringType(), True),
    ])

raw_data_source = '/Volumes/ecommerce/source_data/raw/customers/*.csv'

df = spark.read.option("header","True").option("delimiter",",").schema(customers_schema).csv(raw_data_source)

#add metadata columns
df = df.withColumn("_source_file",F.col("_metadata.file_path")).withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))

df.write.format("delta") \
    .mode("overwrite") \
        .option("mergeSchema", "true") \
            .saveAsTable(f"{catalog_name}.bronze.brz_customers")

In [None]:
#define schema for the raw data file for date
date_schema = StructType([
    StructField('date', StringType(), False),
    StructField('year', IntegerType(), True),
    StructField('day_name', StringType(), True),
    StructField('quarter', IntegerType(), True),
    StructField('week_of_year', IntegerType(), True),
    ])

raw_data_source = '/Volumes/ecommerce/source_data/raw/date/*.csv'

df = spark.read.option("header","True").option("delimiter",",").schema(date_schema).csv(raw_data_source)

#add metadata columns
df = df.withColumn("_source_file",F.col("_metadata.file_path")).withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))
df.write.format("delta") \
    .mode("overwrite") \
        .option("mergeSchema", "true") \
            .saveAsTable(f"{catalog_name}.bronze.brz_date")

In [None]:
#define schema for the raw data file for order
order_schema = StructType([
    StructField('dt', StringType(), True),
    StructField('order_ts', TimestampType(), True),
    StructField('customer_id', StringType(), True),
    StructField('order_id', IntegerType(), False),
    StructField('item_seq', IntegerType(), True),
    StructField('product_id', StringType(), False),
    StructField('quantity', IntegerType(), True),
    StructField('unit_price_currency', StringType(), True),
    StructField('unit_price', FloatType(), True),
    StructField('discount_pct', StringType(), True),
    StructField('tax_amount', FloatType(), True),
    StructField('channel', StringType(), True),
    StructField('coupon_code', StringType(), True),
    ])
raw_data_source = '/Volumes/ecommerce/source_data/raw/order_items/landing/*.csv'

df = spark.read.option("header","True").option("delimiter",",").schema(order_schema).csv(raw_data_source)

#add metadata columns
df = df.withColumn("_source_file",F.col("_metadata.file_path")).withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))

df.write.format("delta") \
    .mode("overwrite") \
        .option("mergeSchema", "true") \
            .saveAsTable(f"{catalog_name}.bronze.brz_orders")

In [None]:
#define schema for the raw data file for product
product_schema = StructType([
    StructField('product_id', StringType(), False),
    StructField('sku', StringType(), True),
    StructField('category_code', StringType(), True),
    StructField('brand_code', StringType(), True),
    StructField('color', StringType(), True),
    StructField('size', StringType(), False),
    StructField('material', StringType(), True),
    StructField('weight_grams', StringType(), True),
    StructField('length_cm', StringType(), True),
    StructField('width_cm', FloatType(), True),
    StructField('height_cm', FloatType(), True),
    StructField('rating_count', IntegerType(), True),
    ])
    
raw_data_source = '/Volumes/ecommerce/source_data/raw/products/*.csv'

df = spark.read.option("header","True").option("delimiter",",").schema(product_schema).csv(raw_data_source)

#add metadata columns
df = df.withColumn("_source_file",F.col("_metadata.file_path")).withColumn("ingested_at",F.current_timestamp())

display(df.limit(5))

df.write.format("delta") \
    .mode("overwrite") \
        .option("mergeSchema", "true") \
            .saveAsTable(f"{catalog_name}.bronze.brz_products")