In [0]:
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, MapType, DoubleType, LongType

class BronzeLayer:
    def __init__(self):
        self.raw_data_dir = "/mnt/raw_data"
        self.checkpoint_dir = "/mnt/raw_data/checkpoint/bronze"

    def get_schema(self):
        return StructType([
            StructField("base", StringType(), True),
            StructField("date", StringType(), True),
            StructField("historical", BooleanType(), True),
            StructField("rates", MapType(StringType(), DoubleType()), True),
            StructField("success", BooleanType(), True),
            StructField("timestamp", LongType(), True),
        ])

    def process_stream(self):
        bronzeDF = (spark.readStream
                        .format("json")
                        .schema(self.get_schema())
                        .load("/mnt/raw_data"))

        query = (bronzeDF.writeStream
                  .format("delta")
                  .outputMode("append")
                  .option("checkpointLocation", "/mnt/raw_data/checkpoint/bronze_checkpoint")
                  .trigger(availableNow=True)
                  .toTable("fixer_bronze"))

        return query

# Start the Bronze stream
bronze_layer = BronzeLayer()
bronze_query = bronze_layer.process_stream()


In [0]:
from pyspark.sql.functions import expr

class SilverLayer:
    def process_stream(self):
        bronzeDF = spark.readStream.table("fixer_bronze")

        explodedDF = bronzeDF.selectExpr(
            "date",
            "timestamp as modify_time",
            "explode(rates) as (currency, rate)"
        )

        query = (explodedDF.writeStream
                            .format("delta")
                            .option("checkpointLocation", "/mnt/raw_data/checkpoint/silver_checkpoint")
                            .outputMode("append")
                            .trigger(availableNow=True)
                            .toTable("fixer_silver"))
        return query

# Start the Silver stream
silver_layer = SilverLayer()
silver_query = silver_layer.process_stream()


In [0]:
from pyspark.sql.functions import from_unixtime, col

class GoldLayer:
    def __init__(self):
        self.silver_table = "fixer_silver"
        self.gold_table = "fixer_gold"
        self.checkpoint_location = "/mnt/raw_data/checkpoint/gold_checkpoint"

    def pivotDF(self, silverDF):
        silverDF_with_event_time = silverDF.withColumn(
            "event_time", from_unixtime(col("modify_time")).cast("timestamp")
        )

        pivotedDF = (
            silverDF_with_event_time
                .withWatermark("event_time", "7 days")
                .groupBy("date", "event_time")
                .pivot("currency", ["USD", "JPY", "CNY"])
                .agg({"rate": "first"})
        )

        return pivotedDF

    def upsert_to_delta(self, batchDF, batchId):
        try:
            batchDF.createOrReplaceTempView("gold_updates")

            spark.sql(f"""
                MERGE INTO {self.gold_table} tgt
                USING gold_updates src
                ON tgt.date = src.date AND tgt.event_time = src.event_time
                WHEN MATCHED THEN UPDATE SET *
                WHEN NOT MATCHED THEN INSERT *
            """)
            
        except Exception as e:
            print(f"❌ Error in batchId {batchId}: {str(e)}")
        else:
            print(f"✅ Batch {batchId} successfully merged into {self.gold_table}")

    def process_stream(self):
        silverDF = spark.readStream.table(self.silver_table)
        pivotedDF = self.pivotDF(silverDF)

        query = (
            pivotedDF.writeStream
                .queryName("gold-processing")
                .foreachBatch(self.upsert_to_delta)
                .option("checkpointLocation", "/mnt/raw_data/checkpoint/gold_checkpoint")
                .outputMode("append")
                .trigger(availableNow=True)
                .start()
        )

        return query

    def upsert_to_delta(self, batchDF, batchId):
        try:
            batchDF.createOrReplaceTempView("gold_updates")

            spark.sql(f"""
                MERGE INTO {self.gold_table} tgt
                USING gold_updates src
                ON tgt.date = src.date AND tgt.event_time = src.event_time
                WHEN MATCHED THEN UPDATE SET *
                WHEN NOT MATCHED THEN INSERT *
            """)
        except Exception as e:
            print(f"❌ Batch {batchId} error: {str(e)}")
        else:
            print(f"✅ Batch {batchId} upsert completed successfully.")

# Make sure fixer_gold exists once initially before running streaming
silverDF = spark.table("fixer_silver")
pivotedDF = (
    silverDF.withColumn("event_time", from_unixtime(col("modify_time")).cast("timestamp"))
            .groupBy("date", "event_time")
            .pivot("currency", ["USD", "JPY", "CNY"])
            .agg({"rate": "first"})
)

pivotedDF.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("fixer_gold")


# Start streaming job
gold_layer = GoldLayer()
gold_query = gold_layer.process_stream()


In [0]:
%sql
select * from fixer_gold where date >= "2025-02-19"

date,event_time,USD,JPY,CNY
2025-02-19,2025-02-19T23:59:59Z,0.634009,95.877343,4.617962
2025-02-20,2025-02-20T23:59:59Z,0.6402,95.653931,4.645307
2025-02-21,2025-02-21T23:59:59Z,0.6355,94.832503,4.608036
2025-02-24,2025-02-24T23:59:59Z,0.63376,94.981535,4.593554
2025-02-22,2025-02-22T23:59:59Z,0.6357,94.868705,4.609486
2025-02-26,2025-02-26T23:59:59Z,0.631028,93.923827,4.591453
2025-02-23,2025-02-23T23:59:59Z,0.636981,95.140164,4.618766
2025-02-25,2025-02-25T23:59:59Z,0.635169,94.685545,4.603767
