# Ingestion of dimension tables into Bronze 

In [0]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DateType, TimestampType, FloatType
import pyspark.sql.functions as F

## Brands


In [0]:
catalog_name = 'ecommerce'

# create schema for brand data file
brand_schema = StructType([
    StructField('brand_code', StringType(), False),
    StructField('brand_name', StringType(), True),
    StructField('category_code', StringType(), True)
])

In [0]:
raw_data_path = "/Volumes/ecommerce/source_data/raw/brands/*.csv"

df = spark.read.option("header", "true").option("delimiter", ",").schema(brand_schema).csv(raw_data_path)

# add metadata column
df = df.withColumn("_source_file", F.col("_metadata.file_path"))\
        .withColumn("ingested_at", F.current_timestamp())

display(df.limit(5))

In [0]:
# write raw data into bronze layer -> ecommerce.bronze.brz_brands

df.write.format("delta")\
    .mode("overwrite")\
    .option("mergeSchema", "True")\
    .saveAsTable(f"{catalog_name}.bronze.brz_brands")

## Category

In [0]:
#schema creation
category_schema = StructType([
    StructField("category_code", StringType(), False),
    StructField("category_name", StringType(), True)
])

raw_data_path = "/Volumes/ecommerce/source_data/raw/category/*.csv"

df = spark.read.option("header", "true").option("delimiter", ",").schema(category_schema).csv(raw_data_path)

# add metadata columns
df = df.withColumn("_ingested_at", F.current_timestamp())\
        .withColumn("_source_file", F.col("_metadata.file_path"))

# write raw data into bronze layer -> ecommerce.bronze.brz_category
df.write.format("delta")\
    .mode("overwrite")\
    .option("mergeSchema", "true")\
    .saveAsTable(f"{catalog_name}.bronze.brz_category")

## Products


In [0]:
# schema creation
products_schema = StructType([
    StructField("product_id", StringType(), False),
    StructField("sku", StringType(), True),
    StructField("category_Code", StringType(), True),
    StructField("brand_code", StringType(), True),
    StructField("color", StringType(), True),
    StructField("size", StringType(), True),
    StructField("material", StringType(), True),
    StructField("weight_grams", StringType(), True),   # data type is Sting due to incoming data may contain anamolies
    StructField("length_cm", StringType(), True),
    StructField("width_cm",StringType(), True),
    StructField("height_cm", StringType(), True),
    StructField("rating_count", IntegerType(), True),
    StructField("file_name", StringType(), False),
    StructField("ingested_timestamp", TimestampType(), False)
])

raw_data_path = "/Volumes/ecommerce/source_data/raw/products/*.csv"

df = spark.read.option("header", "true").option("delimiter", ",").schema(products_schema).csv(raw_data_path)\
     .withColumn("file_name", F.col("_metadata.file_path"))\
    .withColumn("ingested_timestamp", F.current_timestamp())

# write raw data into bronze layer -> ecommerce.bronze.brz_products
df.write.format("delta")\
  .mode("overwrite")\
  .option("mergeSchema", "true")\
  .saveAsTable(f"{catalog_name}.bronze.brz_products")

## Customers

In [0]:
#schema creation
customers_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("phone", StringType(), True),
    StructField("country_code", StringType(), True),
    StructField("country", StringType(), True),
    StructField("state", StringType(), True)
])

raw_data_path = "/Volumes/ecommerce/source_data/raw/customers/*.csv"

df = spark.read.option("header", "true").option("delimiter", ",").schema(customers_schema).csv(raw_data_path)\
     .withColumn("file_name", F.col("_metadata.file_path"))\
     .withColumn("ingested_timestamp", F.current_timestamp())

# write raw data into bronze layer -> ecommerce.bronze.brz_customers
df.write.format("delta")\
    .mode("overwrite")\
    .option("mergeSchema", "true")\
    .saveAsTable(f"{catalog_name}.bronze.brz_customers")

## Date


In [0]:
#schema_creation
date_schema = StructType([
    StructField("date", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("day_name", StringType(), True),
    StructField("quarter", IntegerType(), True),
    StructField("week_of_year", IntegerType(), True),
])

raw_data_path = "/Volumes/ecommerce/source_data/raw/date/*.csv"

df = spark.read.option("header", "true").option("delimiter", ",").schema(date_schema).csv(raw_data_path)\
    .withColumn("_ingested_at", F.current_timestamp())\
    .withColumn("_source_file", F.col("_metadata.file_path"))

# write raw data into bronze layer -> ecommerce.bronze.brz_date
df.write.format("delta")\
    .mode("overwrite")\
    .option("mergeSchema", "true")\
    .saveAsTable(f"{catalog_name}.bronze.brz_date")
