In [0]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType, TimestampType, FloatType
import pyspark.sql.functions as F

In [0]:
catalog_name = 'ecommerce'

# Define schema for the data file
brand_schema = StructType([
    StructField("brand_code", StringType(), False),
    StructField("brand_name", StringType(), True),
    StructField("category_code", StringType(), True),
    ])

In [0]:
raw_data_path = "/Volumes/ecommerce/source_data/raw/brands/*.csv"

df = spark.read.option('header', "true").option("delimeter", ",").schema(brand_schema).csv(raw_data_path)

# add metadata columns
df = df.withColumn("_source_file", F.col("_metadata.file_path")) \
    .withColumn("ingested_at", F.current_timestamp())

display(df.limit(5))


In [0]:
# Write the data in delta lake format
df.write.format("delta")\
    .mode("overwrite")\
        .option("mergeSchema", "true")\
        .saveAsTable(f"{catalog_name}.bronze.brz_brands")