In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DataType, TimestampType, FloatType
import pyspark.sql.functions as F

In [0]:
catalog_name = "olist"
source_volume = "/Volumes/olist/source_data/raw/olist_data"

###Orders

In [0]:
orders_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("order_status", StringType(), True),
    StructField("order_purchase_timestamp", TimestampType(), True),
    StructField("order_approved_at", TimestampType(), True),
    StructField("order_delivered_carrier_date", TimestampType(), True),
    StructField("order_delivered_customer_date", TimestampType(), True),
    StructField("order_estimated_delivery_date", TimestampType(), True)
])

raw_data_path = "/Volumes/olist/source_data/raw/olist_data/olist_orders_dataset.csv"

df_raw = (
    spark.read
    .option("header", "true")
    .schema(orders_schema)
    .csv(raw_data_path)
)

df_raw = (
    df_raw
    .withColumn("_ingested_at", F.current_timestamp())
    .withColumn("_source_file", F.col("_metadata.file_path"))
)

df_raw.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_orders")


### Customers

In [0]:
customers_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("customer_unique_id", StringType(), False),
    StructField("customer_zip_code_prefix", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True)
])

raw_data_path = f"{source_volume}/olist_customers_dataset.csv"

df_raw = spark.read.option("header", "true") \
    .schema(customers_schema) \
    .csv(raw_data_path)

df_raw = df_raw.withColumn("_ingested_at", F.current_timestamp()) \
               .withColumn("_source_file", F.col("_metadata.file_path"))

df_raw.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_customers")


### Order Items

In [0]:
order_items_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("order_item_id", IntegerType(), False),
    StructField("product_id", StringType(), False),
    StructField("seller_id", StringType(), False),
    StructField("shipping_limit_date", TimestampType(), True),
    StructField("price", FloatType(), True),
    StructField("freight_value", FloatType(), True)
])

raw_data_path = f"{source_volume}/olist_order_items_dataset.csv"

df_raw = spark.read.option("header", "true") \
    .schema(order_items_schema) \
    .csv(raw_data_path)

df_raw = df_raw.withColumn("_ingested_at", F.current_timestamp()) \
               .withColumn("_source_file", F.col("_metadata.file_path"))

df_raw.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_order_items")


### Payments

In [0]:
payments_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("payment_sequential", IntegerType(), True),
    StructField("payment_type", StringType(), True),
    StructField("payment_installments", IntegerType(), True),
    StructField("payment_value", FloatType(), True)
])

raw_data_path = f"{source_volume}/olist_order_payments_dataset.csv"

df_raw = spark.read.option("header", "true") \
    .schema(payments_schema) \
    .csv(raw_data_path)

df_raw = df_raw.withColumn("_ingested_at", F.current_timestamp()) \
               .withColumn("_source_file", F.col("_metadata.file_path"))

df_raw.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_payments")


### Reviews

In [0]:
reviews_schema = StructType([
    StructField("review_id", StringType(), False),
    StructField("order_id", StringType(), False),
    StructField("review_score", IntegerType(), True),
    StructField("review_comment_title", StringType(), True),
    StructField("review_comment_message", StringType(), True),
    StructField("review_creation_date", TimestampType(), True),
    StructField("review_answer_timestamp", TimestampType(), True)
])

raw_data_path = f"{source_volume}/olist_order_reviews_dataset.csv"

df_raw = spark.read.option("header", "true") \
    .schema(reviews_schema) \
    .csv(raw_data_path)

df_raw = df_raw.withColumn("_ingested_at", F.current_timestamp()) \
               .withColumn("_source_file", F.col("_metadata.file_path"))

df_raw.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_reviews")


### Products

In [0]:
products_schema = StructType([
    StructField("product_id", StringType(), False),
    StructField("product_category_name", StringType(), True),
    StructField("product_name_lenght", IntegerType(), True),
    StructField("product_description_lenght", IntegerType(), True),
    StructField("product_photos_qty", IntegerType(), True),
    StructField("product_weight_g", FloatType(), True),
    StructField("product_length_cm", FloatType(), True),
    StructField("product_height_cm", FloatType(), True),
    StructField("product_width_cm", FloatType(), True)
])

raw_data_path = f"{source_volume}/olist_products_dataset.csv"

df_raw = spark.read.option("header", "true") \
    .schema(products_schema) \
    .csv(raw_data_path)

df_raw = df_raw.withColumn("_ingested_at", F.current_timestamp()) \
               .withColumn("_source_file", F.col("_metadata.file_path"))

df_raw.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_products")


### Sellers

In [0]:
sellers_schema = StructType([
    StructField("seller_id", StringType(), False),
    StructField("seller_zip_code_prefix", StringType(), True),
    StructField("seller_city", StringType(), True),
    StructField("seller_state", StringType(), True)
])

raw_data_path = f"{source_volume}/olist_sellers_dataset.csv"

df_raw = spark.read.option("header", "true") \
    .schema(sellers_schema) \
    .csv(raw_data_path)

df_raw = df_raw.withColumn("_ingested_at", F.current_timestamp()) \
               .withColumn("_source_file", F.col("_metadata.file_path"))

df_raw.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_sellers")

### Geolocation

In [0]:
geolocation_schema = StructType([
    StructField("geolocation_zip_code_prefix", StringType(), True),
    StructField("geolocation_lat", FloatType(), True),
    StructField("geolocation_lng", FloatType(), True),
    StructField("geolocation_city", StringType(), True),
    StructField("geolocation_state", StringType(), True)
])

raw_data_path = f"{source_volume}/olist_geolocation_dataset.csv"

df_raw = (
    spark.read
    .option("header", "true")
    .schema(geolocation_schema)
    .csv(raw_data_path)
)

df_raw = (
    df_raw
    .withColumn("_ingested_at", F.current_timestamp())
    .withColumn("_source_file", F.col("_metadata.file_path"))
)

df_raw.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_geolocation")

### Product Category Name Translation

In [0]:
category_translation_schema = StructType([
    StructField("product_category_name", StringType(), False),
    StructField("product_category_name_english", StringType(), True)
])

raw_data_path = f"{source_volume}/product_category_name_translation.csv"

df_raw = (
    spark.read
    .option("header", "true")
    .schema(category_translation_schema)
    .csv(raw_data_path)
)

df_raw = (
    df_raw
    .withColumn("_ingested_at", F.current_timestamp())
    .withColumn("_source_file", F.col("_metadata.file_path"))
)

df_raw.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.bronze.brz_product_category_translation")