# Transformer Class

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, col, broadcast, collect_set, size, array_contains, row_number, min, datediff, sum as _sum, rank, create_map
from pyspark.sql import DataFrame


class Transformer:

    def __init__(self):
        pass

    def transform(self, input_DFs):
        pass


In [0]:
# -	Customers who have bought AirPods after buying an iPhone
class AirPods_After_Iphone_Transformer(Transformer):
    """
    Identifies customers who purchased AirPods immediately after purchasing an iPhone.
    """

    def transform(self, input_DFs):
        # Step 1: Retrieve transaction and customer DataFrames
        transaction_input_DF = input_DFs.get("transaction_input_DF")
        customer_input_DF = input_DFs.get("customer_input_DF")

        print("Transaction DataFrame:")
        display(transaction_input_DF)

        print("Customer DataFrame:")
        display(customer_input_DF)

        # Step 2: Define a window partitioned by customer_id and ordered by transaction_date
        window_spec = Window.partitionBy("customer_id").orderBy("transaction_date")

        # Step 3: Add a column to capture the next product purchased by each customer
        transformed_DF = transaction_input_DF.withColumn(
            "next_product_name", lead("product_name").over(window_spec)
        )
        print("DataFrame with next product per transaction:")
        display(transformed_DF.orderBy("customer_id", "transaction_date", "product_name"))

        # Step 4: Filter for customers who bought an iPhone immediately followed by AirPods
        filtered_DF = transformed_DF.filter(
            (col("product_name") == 'iPhone') & (col("next_product_name") == 'AirPods')
        )
        print("Filtered DataFrame (iPhone followed by AirPods):")
        display(filtered_DF.orderBy("customer_id", "transaction_date", "product_name"))

        # Step 5: Join with customer DataFrame to get additional customer details
        join_DF = customer_input_DF.join(
            broadcast(filtered_DF),
            on="customer_id"
        )
        print("Joined DataFrame with customer details:")
        display(join_DF)

        # Step 6: Return the relevant columns
        return join_DF.select(
            "customer_id",
            "customer_name",
            "location"
        )


In [0]:
# -	Customers who have bought both AirPods and iPhone
class Only_AirPods_and_Iphone_Transformer(Transformer):
    """
    Identifies customers who have purchased exactly two products: iPhone and AirPods.
    """

    def transform(self, input_DFs):
        # Step 1: Retrieve transaction and customer DataFrames
        transaction_input_DF = input_DFs.get("transaction_input_DF")
        customer_input_DF = input_DFs.get("customer_input_DF")

        print("Transaction DataFrame:")
        display(transaction_input_DF)

        print("Customer DataFrame:")
        display(customer_input_DF)

        # Step 2: Group transactions by customer_id and collect unique product names
        grouped_DF = transaction_input_DF.groupBy("customer_id").agg(
            collect_set("product_name").alias("products")
        )
        print("Grouped DataFrame with products per customer:")
        display(grouped_DF)

        # Step 3: Filter for customers who bought exactly two products: iPhone and AirPods
        filtered_DF = grouped_DF.filter(
            (array_contains(col("products"), "iPhone")) &
            (array_contains(col("products"), "AirPods")) &
            (size(col("products")) == 2)
        )
        print("Filtered DataFrame (only iPhone and AirPods):")
        display(filtered_DF)

        # Step 4: Join with customer DataFrame to get customer details
        join_DF = customer_input_DF.join(
            broadcast(filtered_DF),
            on="customer_id"
        )
        print("Joined DataFrame with customer details:")
        display(join_DF)

        # Step 5: Return relevant customer details
        return join_DF.select(
            "customer_id",
            "customer_name",
            "location"
        )


In [0]:
# -	List all the products bought by customers after their initial purchase
class Products_After_Initial_Purchase_Transformer(Transformer):
    """
    Lists all products purchased by each customer after their initial purchase.
    """

    def transform(self, input_DFs):
        # Step 1: Retrieve transaction and customer DataFrames
        transaction_input_DF = input_DFs.get("transaction_input_DF")
        customer_input_DF = input_DFs.get("customer_input_DF")

        print("Transaction DataFrame:")
        display(transaction_input_DF)

        print("Customer DataFrame:")
        display(customer_input_DF)

        # Step 2: Define a window to order transactions by date per customer
        window_spec = Window.partitionBy("customer_id").orderBy("transaction_date")

        # Step 3: Add purchase order number to each transaction per customer
        numbered_df = transaction_input_DF.withColumn(
            "purchase_order",
            row_number().over(window_spec)
        )
        print("Transaction DataFrame with purchase order per customer:")
        display(numbered_df)

        # Step 4: Filter for transactions after the first purchase
        subsequent_df = numbered_df.filter(col("purchase_order") > 1)
        print("Transactions after the first purchase:")
        display(subsequent_df.orderBy("customer_id", "transaction_date", "product_name"))

        # Step 5: Group by customer and collect unique product names after the first purchase
        grouped_df = subsequent_df.groupBy("customer_id").agg(
            collect_set("product_name").alias("products_after_initial")
        )
        print("Grouped DataFrame with products purchased after initial transaction:")
        display(grouped_df)

        # Step 6: Join with customer data to enrich the results
        join_DF = customer_input_DF.join(
            broadcast(grouped_df),
            on="customer_id",
            how="left"
        )
        print("Joined DataFrame with customer details and follow-up products:")
        display(join_DF)

        # Step 7: Return final DataFrame with selected columns
        return join_DF.select(
            "customer_id",
            "customer_name",
            "location",
            "products_after_initial"
        )


In [0]:
# - Determine the average time delay between buying an iPhone and buying an AirPods for each customer.
class Avg_Time_Delay_iPhone_AirPods_Transformer(Transformer):
    """
    Computes the time delay (in days) between purchasing an iPhone and AirPods
    for each customer who has purchased both products.
    """

    def transform(self, input_DFs: dict) -> DataFrame:
        # Step 1: Retrieve transaction and customer DataFrames from the input
        transaction_input_DF = input_DFs.get("transaction_input_DF")
        customer_input_DF = input_DFs.get("customer_input_DF")

        print("Transaction DataFrame:")
        display(transaction_input_DF)

        print("Customer DataFrame:")
        display(customer_input_DF)

        # Step 2: Filter transactions to include only 'iPhone' and 'AirPods' purchases
        filtered_df = transaction_input_DF.filter(
            col("product_name").isin(["iPhone", "AirPods"])
        )
        print("Filtered transactions (iPhone and AirPods only):")
        display(filtered_df)

        # Step 3: Get the earliest iPhone purchase date for each customer
        iphone_df = filtered_df.filter(col("product_name") == "iPhone") \
            .groupBy("customer_id") \
            .agg(min("transaction_date").alias("iphone_date"))
        print("Earliest iPhone purchase date per customer:")
        display(iphone_df)

        # Step 4: Get the earliest AirPods purchase date for each customer
        airpods_df = filtered_df.filter(col("product_name") == "AirPods") \
            .groupBy("customer_id") \
            .agg(min("transaction_date").alias("airpods_date"))
        print("Earliest AirPods purchase date per customer:")
        display(airpods_df)

        # Step 5: Join the iPhone and AirPods DataFrames to get customers who bought both
        joined_df = iphone_df.join(airpods_df, on="customer_id", how="inner")
        print("Customers with both iPhone and AirPods purchase dates:")
        display(joined_df)

        # Step 6: Calculate the time delay in days between the two purchases
        result_df = joined_df.withColumn(
            "time_delay_days",
            datediff(col("airpods_date"), col("iphone_date"))
        )
        print("Time delay between iPhone and AirPods purchases (in days):")
        display(result_df)

        # Step 7: Join the result with customer information for enrichment
        final_df = result_df.join(
            customer_input_DF.select("customer_id", "customer_name", "location"),
            on="customer_id",
            how="left"
        )
        print("Final DataFrame with customer details and time delay:")
        print("Note: Negative time delay value indiacate AirPods were bought first then Iphone.")
        display(final_df)

        # Step 8: Return the final result with selected columns
        return final_df.select(
            "customer_id",
            "customer_name",
            "location",
            "iphone_date",
            "airpods_date",
            "time_delay_days"
        )


In [0]:
class Top_Products_By_Revenue_Transformer(Transformer):
    """
    Ranks all products across all categories by total revenue (global ranking).
    """

    def transform(self, input_DFs: dict) -> DataFrame:
        # Step 1: Retrieve input DataFrames
        transaction_input_DF = input_DFs.get("transaction_input_DF")
        product_input_DF = input_DFs.get("product_input_DF")

        print("Transaction DataFrame:")
        display(transaction_input_DF)

        print("Product DataFrame:")
        display(product_input_DF)

        # Step 2: Create product name mapping expression
        name_mapping_expr = create_map(
            [lit("iPhone"), lit("iPhone SE"),
             lit("AirPods"), lit("AirPods Pro"),
             lit("MacBook"), lit("MacBook Air")]
        )

        # Step 3: Apply mapping to standardize product names in transactions
        mapped_transaction_df = transaction_input_DF.withColumn(
            "mapped_product_name", name_mapping_expr[col("product_name")]
        )
        print("Mapped transaction DataFrame:")
        display(mapped_transaction_df)

        # Step 4: Join on mapped product name with product catalog
        merged_df = mapped_transaction_df.join(
            product_input_DF.withColumnRenamed("product_name", "full_product_name"),
            mapped_transaction_df["mapped_product_name"] == col("full_product_name"),
            how="inner"
        )
        print("Merged DataFrame with resolved product names:")
        display(merged_df)

        # Step 5: Compute revenue per transaction
        merged_df = merged_df.withColumn("revenue", col("price"))

        # Step 6: Aggregate total revenue by product
        aggregated_df = merged_df.groupBy("full_product_name", "category").agg(
            _sum("revenue").alias("total_revenue")
        )
        print("Total revenue per product:")
        display(aggregated_df)

        # Step 7: Apply global rank by total revenue
        window_spec = Window.orderBy(col("total_revenue").desc())
        ranked_df = aggregated_df.withColumn("rank", rank().over(window_spec))
        print("Globally ranked products by total revenue:")
        display(ranked_df)

        # Step 8: Return final result
        return ranked_df.select(
            col("full_product_name").alias("product_name"),
            "category",
            "total_revenue",
            "rank"
        )
