In [0]:
api_data = [
    {"order_id": 1, "customer": "A", "amount": 500, "status": "NEW"},
    {"order_id": 2, "customer": "B", "amount": 1200, "status": "NEW"},
    {"order_id": 3, "customer": "A", "amount": 700, "status": "CANCELLED"},
    {"order_id": 4, "customer": "C", "amount": 2000, "status": "NEW"}
]


In [0]:
from pyspark.sql import functions as F

class OrderProcessor:   # ðŸ‘ˆ CLASS
    def __init__(self, spark, data, layer):
        self.spark = spark
        self.data = data
        self.layer = layer

        self.df = spark.createDataFrame(data)

    def show_data(self):
        print(f"Showing {self.layer} data")
        self.df.show()

    def filter_active_orders(self):
        return self.df.filter(F.col("status") == "NEW")

    def total_amount(self):   # ðŸ‘ˆ METHOD INSIDE CLASS
        return self.df.groupBy("customer").sum("amount")


In [0]:
bronze_processor = OrderProcessor(
    spark=spark,
    data=api_data,
    layer="BRONZE"
)   # ðŸ‘ˆ OBJECT

silver_processor = OrderProcessor(
    spark=spark,
    data=api_data,
    layer="SILVER"
)   # ðŸ‘ˆ OBJECT


In [0]:
bronze_processor.show_data()


In [0]:
silver_df = silver_processor.filter_active_orders()
silver_df.show()


In [0]:
silver_df.write.mode("overwrite").saveAsTable("orders_silver")
spark.table("orders_silver").show()


In [0]:
def total_amount(self):
    return self.df.groupBy("customer").sum("amount")


In [0]:
silver_processor.total_amount().show()


In [0]:
from pyspark.sql import functions as F
from datetime import datetime

class OrderProcessor:
    def __init__(self, spark, data, layer):
        self.spark = spark
        self.layer = layer
        self.run_id = f"RUN_{datetime.now().strftime('%Y%m%d%H%M%S')}"

        self.df = spark.createDataFrame(data)

    def add_audit_columns(self):
        return (
            self.df
            .withColumn("layer", F.lit(self.layer))
            .withColumn("run_id", F.lit(self.run_id))
            .withColumn("processed_ts", F.current_timestamp())
        )


In [0]:
silver_processor = OrderProcessor(spark, api_data, "SILVER")
silver_processor.add_audit_columns().show()
