In [0]:
from pyspark.sql.functions import col, from_unixtime

df_events = spark.table("e2e.default.events")

events_columns = df_events.select(
    "timestamp", "visitorid", "event", "transactionid", "itemid"
)
final_df_events = events_columns.withColumn(
    "event_datetime", from_unixtime(col("timestamp")/1000)
)

df_items = spark.table("e2e.default.item_properties_part1")
df_items_clean = df_items.where("property in ('available', 'categoryid')")
items_columns = df_items_clean.select("timestamp", "itemid", "property", "value")
final_df_items = items_columns.withColumn(
    "item_datetime", from_unixtime(col("timestamp")/1000)
)

joined_df = final_df_events.join(
    final_df_items,
    final_df_events["itemid"] == final_df_items["itemid"],
    "inner"
)

joined_df = joined_df.select(
    "visitorid",
    "event",
    "transactionid",
    "event_datetime",
    final_df_items["itemid"],
    "property",
    "value",
    "item_datetime"
)

joined_df = joined_df.dropDuplicates()
joined_df.write.format("delta").saveAsTable("e2e.default.events_items_silver")




In [0]:
df = spark.sql("""
WITH visitor_stats AS (
    SELECT
        visitorid,
        COUNT(*) AS visitor_total_events,
        SUM(CASE WHEN event = 'transaction' THEN 1 END) AS visitor_purchase_count,
        ROUND(SUM(CASE WHEN event = 'transaction' THEN 1 END) * 100.0 / COUNT(*), 2) AS visitor_conversion_rate,
        MAX(CASE WHEN event = 'transaction' THEN event_datetime END) AS last_purchase_time,
        ROUND((UNIX_TIMESTAMP(MAX(event_datetime)) - UNIX_TIMESTAMP(MIN(event_datetime))) / 60, 2) AS session_length
    FROM events_items_silver
    GROUP BY visitorid
),
base_features AS (
    SELECT
        e.transactionid,
        e.visitorid,
        vs.visitor_total_events,
        vs.visitor_purchase_count,
        vs.visitor_conversion_rate,
        vs.last_purchase_time,
        DATEDIFF(vs.last_purchase_time, MAX(e.event_datetime)) AS visitor_days_since_last_purchase,
        vs.session_length,
        COUNT(DISTINCT e.itemid) AS unique_items_basket_size
    FROM events_items_silver e
    JOIN visitor_stats vs ON e.visitorid = vs.visitorid
    WHERE e.transactionid IS NOT NULL
    GROUP BY
        e.transactionid,
        e.visitorid,
        vs.visitor_total_events,
        vs.visitor_purchase_count,
        vs.visitor_conversion_rate,
        vs.last_purchase_time,
        vs.session_length
),
view_labels AS (
    SELECT
        v.visitorid,
        v.transactionid,
        v.itemid,
        v.event_datetime AS view_time,
        CASE WHEN COUNT(p.event_datetime) > 0 THEN 1 ELSE 0 END AS visitor_item_purchase_label
    FROM events_items_silver v
    LEFT JOIN events_items_silver p
        ON v.visitorid = p.visitorid
       AND v.itemid = p.itemid
       AND p.event = 'view'
       AND p.event_datetime > v.event_datetime
       AND p.event_datetime <= v.event_datetime + INTERVAL 60 DAY
    WHERE v.event = 'transaction'
    GROUP BY v.visitorid, v.transactionid, v.itemid, v.event_datetime
)
SELECT
    DISTINCT
    bf.*,
    vl.visitor_item_purchase_label
FROM base_features as bf
JOIN view_labels vl
    ON vl.visitorid = bf.visitorid
   AND vl.transactionid = bf.transactionid ORDER BY bf.transactionid
""")


df.write.format("delta").mode("overwrite").saveAsTable("e2e.default.visitor_transactions_gold")