#### let's practice more - ETL

In [0]:
# this is master data
products_data = [
    ("P001", "MacBook Pro", 2500),
    ("P002", "iPhone 15", 1200),
    ("P003", "AirPods", 250),
    ("P004", "iPad Air", 800)
]
products_df = spark.createDataFrame(products_data, ["prod_id", "prod_name", "price"])

products_df.write.format("delta").mode("overwrite").save("s3a://wj-spark-bucket/apple/master_data/products")

In [0]:
input_path = "s3a://wj-spark-bucket/apple/orders_input"

# schema
schema = "order_id STRING, prod_id STRING, country STRING, quantity INT"

orders_stream = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("header", "true") \
    .schema(schema) \
    .load(input_path)

In [0]:
from pyspark.sql.functions import col

prod_master = spark.read.format("delta").load("s3a://wj-spark-bucket/apple/master_data/products")

transformed_stream = orders_stream.join(prod_master, "prod_id") \
    .withColumn("total_sales", col("quantity") * col("price"))

In [0]:
checkpoint_path = "s3a://wj-spark-bucket/apple/checkpoint"
output_path = "s3a://wj-spark-bucket/apple/sales_report"

query = transformed_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_path) \
    .trigger(availableNow=True) \
    .start(output_path)

query.awaitTermination()
print("ETL pipeline finished!")

In [0]:
display(spark.read.format("delta").load(output_path))