In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window
import pandas as pd

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
cohort = spark.read.parquet(r"data/cohort_data")

In [6]:
cohort.count()

26436

In [7]:
cohort.show(5)

+--------------------+------------------+--------------------+-----------+-------------+
|             user_id|         user_name|           cohort_id|cohort_name| marketing_id|
+--------------------+------------------+--------------------+-----------+-------------+
|f8dc8576-6af9-44e...|        Rose White|677d3a5f-e782-455...|    ANDROID|android promo|
|16ca67e7-1515-420...|      Mark Nichols|677d3a5f-e782-455...|    ANDROID|android promo|
|53bb794b-58b1-4f5...|     David Garrett|57cdf7dc-177a-4cf...|      APPLE|  apple promo|
|28196996-599a-45f...|Laura Mitchell DVM|57cdf7dc-177a-4cf...|      APPLE|  apple promo|
|99b5f502-2517-43f...|    Sabrina Oneill|677d3a5f-e782-455...|    ANDROID|android promo|
+--------------------+------------------+--------------------+-----------+-------------+
only showing top 5 rows



In [8]:
# Parition is not part of data , so unable to read directly in spark
clickstream = spark.createDataFrame(pd.read_parquet(r"data/clickstream_data"))

In [10]:
clickstream.show(5)

+--------------------+------------------+----------+-----------+--------------------+--------------------+-----------------+---------------------+--------------------+--------------------+
|             user_id|         user_name|event_type|facility_id|       facility_name|           market_id|      market_name|event_start_timestamp| event_end_timestamp|processing_timestamp|
+--------------------+------------------+----------+-----------+--------------------+--------------------+-----------------+---------------------+--------------------+--------------------+
|f8dc8576-6af9-44e...|        Rose White|  view_map|       5555|Manhattan Parking...|0a5991de-29c2-473...|New York City, NY|  2021-03-29 00:00:00|2021-03-29 00:00:...|2021-03-29 00:01:...|
|16ca67e7-1515-420...|      Mark Nichols|  view_map|       1111| Parking on the Blvd|f08ec618-2498-4e5...|        Miami, FL| 2021-03-29 00:00:...|2021-03-29 00:00:...|2021-03-29 00:00:...|
|53bb794b-58b1-4f5...|     David Garrett| check_out|   

In [11]:
clickstream.count()

27000

In [12]:
# Partition is not part of data, so unable to read directly in spark

account = spark.createDataFrame(pd.read_parquet(r"data/accounting_data"))

In [13]:
account.show(5)

+--------------+-----------+--------------------+-----------+------------------+-----------------+----------------+---------------------+--------------------+
|transaction_id|facility_id|           market_id|market_name|transaction_amount|   revenue_amount|transaction_type|transaction_timestamp|processing_timestamp|
+--------------+-----------+--------------------+-----------+------------------+-----------------+----------------+---------------------+--------------------+
|         11631|        111|f08ec618-2498-4e5...|      Miami|               5.0|4.911647666700684|        PURCHASE| 2021-03-29 00:00:...|2021-03-29 00:00:...|
|         11633|       1111|f08ec618-2498-4e5...|      Miami|               9.0|8.649414147906768|        PURCHASE| 2021-03-29 00:00:...|2021-03-29 00:00:...|
|         11639|          7|7d38bdf9-0509-411...|    Toronto|               9.0| 8.62579726106193|        PURCHASE| 2021-03-29 00:01:...|2021-03-29 00:01:...|
|         11640|         11|f08ec618-2498-4e5.

In [14]:
account.count()

7967

In [16]:
# Join all the three tables based on the conditions
# Selecting only required columns
data = (
    cohort.select("user_id", "cohort_id", "cohort_name")
    .join(
        clickstream.select(
            "user_id",
            "facility_id",
            "market_id",
            "market_name",
            "event_start_timestamp",
            "event_end_timestamp",
        ),
        ["user_id"],
        how="inner",
    )
    .join(
        account.select(
            "facility_id",
            "revenue_amount",
            "transaction_timestamp",
            "processing_timestamp",
        ),
        (
            (clickstream["facility_id"] == account["facility_id"])
            & (
                account["transaction_timestamp"].between(
                    clickstream["event_start_timestamp"],
                    clickstream["event_end_timestamp"],
                )
            )
        ),
        how="inner",
    )
)


In [17]:
data.count()

7774

In [20]:
# Expected count is 7967, so let check for missing counts

(cohort.select("user_id", "cohort_id", "cohort_name")
    .join(
        clickstream.select(
            "user_id",
            "facility_id",
            "market_id",
            "market_name",
            "event_start_timestamp",
            "event_end_timestamp",
        ),
        ["user_id"],
        how="left_anti",
    )).count()

# All records in cohort have matching records in clickstream, lets the check the reverse

0

In [21]:
(clickstream.select(
            "user_id",
            "facility_id",
            "market_id",
            "market_name",
            "event_start_timestamp",
            "event_end_timestamp",
        )
    .join(cohort.select("user_id", "cohort_id", "cohort_name"),
        ["user_id"],
        how="left_anti",
    )).count()

# Some records in clickstream doesnt have any matching records in cohort

564

In [22]:
(clickstream.select(
            "user_id",
            "facility_id",
            "market_id",
            "market_name",
            "event_start_timestamp",
            "event_end_timestamp",
        )
.join(
account.select(
            "facility_id",
            "revenue_amount",
            "transaction_timestamp",
            "processing_timestamp",
        ),
    (
            (clickstream["facility_id"] == account["facility_id"])
            & (
                account["transaction_timestamp"].between(
                    clickstream["event_start_timestamp"],
                    clickstream["event_end_timestamp"],
                )
            )
        ),how="left_anti"
).count()
)

# Records in clickstream , doesnt have any accouting which is fine

19033

In [24]:
(account.select(
            "facility_id",
            "revenue_amount",
            "transaction_timestamp",
            "processing_timestamp",
        )
.join(
clickstream.select(
            "user_id",
            "facility_id",
            "market_id",
            "market_name",
            "event_start_timestamp",
            "event_end_timestamp",
        ),
    (
            (clickstream["facility_id"] == account["facility_id"])
            & (
                account["transaction_timestamp"].between(
                    clickstream["event_start_timestamp"],
                    clickstream["event_end_timestamp"],
                )
            )
        ),how="left_anti"
).count()
)

# All accounting data have corresponding data in clickstream

0

In [25]:
data.show(3)

+--------------------+--------------------+-----------+-----------+--------------------+-----------+---------------------+--------------------+-----------+-----------------+---------------------+--------------------+
|             user_id|           cohort_id|cohort_name|facility_id|           market_id|market_name|event_start_timestamp| event_end_timestamp|facility_id|   revenue_amount|transaction_timestamp|processing_timestamp|
+--------------------+--------------------+-----------+-----------+--------------------+-----------+---------------------+--------------------+-----------+-----------------+---------------------+--------------------+
|9c9c7bb3-8d34-46a...|677d3a5f-e782-455...|    ANDROID|          7|7d38bdf9-0509-411...|Toronto, ON| 2021-03-29 00:00:...|2021-03-29 00:01:...|          7| 8.62579726106193| 2021-03-29 00:01:...|2021-03-29 00:01:...|
|b2c83096-16bc-40a...|aaed4e58-9964-438...|     CHROME|          7|7d38bdf9-0509-411...|Toronto, ON| 2021-03-29 00:02:...|2021-03-29

In [30]:
# create week from accouting data processing timestamp
from datetime import datetime

def getWeek(date:datetime)->int:
    return int(date.strftime("%V"))

dateUDF = udf(getWeek,IntegerType())

In [31]:
processed = data.withColumn("week",dateUDF(col("processing_timestamp")))

In [32]:
processed.show(3)

+--------------------+--------------------+-----------+-----------+--------------------+-----------+---------------------+--------------------+-----------+-----------------+---------------------+--------------------+----+
|             user_id|           cohort_id|cohort_name|facility_id|           market_id|market_name|event_start_timestamp| event_end_timestamp|facility_id|   revenue_amount|transaction_timestamp|processing_timestamp|week|
+--------------------+--------------------+-----------+-----------+--------------------+-----------+---------------------+--------------------+-----------+-----------------+---------------------+--------------------+----+
|9c9c7bb3-8d34-46a...|677d3a5f-e782-455...|    ANDROID|          7|7d38bdf9-0509-411...|Toronto, ON| 2021-03-29 00:00:...|2021-03-29 00:01:...|          7| 8.62579726106193| 2021-03-29 00:01:...|2021-03-29 00:01:...|  13|
|b2c83096-16bc-40a...|aaed4e58-9964-438...|     CHROME|          7|7d38bdf9-0509-411...|Toronto, ON| 2021-03-29 

In [35]:
# Create Window functions to Sum of revenue grouping market_id,cohort_id and week

SumFunc = Window.partitionBy(col('market_id'),col('cohort_id'),col('week'))

In [39]:
# create Window function to calculate the rank 

RankFunc = Window.partitionBy(col('market_id'),col('week')).orderBy(col('revenue').desc())

In [46]:
final = (
    processed.withColumn("revenue", sum(col("revenue_amount")).over(SumFunc))
    .withColumn("rank", row_number().over(RankFunc))
    .filter(col("rank") == 1)
    .select("market_name", "cohort_name", "revenue", "week")
)


In [48]:
final.orderBy(col('market_name').asc(),col('week').asc()).show()

+-----------------+-----------+------------------+----+
|      market_name|cohort_name|           revenue|week|
+-----------------+-----------+------------------+----+
|      Chicago, IL|     CHROME| 2192.663475869587|  13|
|      Chicago, IL|     CHROME|1937.4617431300476|  14|
|      Chicago, IL|     CHROME|1805.3632825242073|  15|
|      Chicago, IL|      APPLE|1742.1146116615416|  16|
|        Miami, FL|      APPLE|1074.4541441923989|  13|
|        Miami, FL|      APPLE|1034.3349953133907|  14|
|        Miami, FL|     CHROME|1227.8943091940025|  15|
|        Miami, FL|     CHROME| 1120.021240817607|  16|
|New York City, NY|      APPLE| 788.2124150628206|  13|
|New York City, NY|     CHROME| 759.8671458701211|  14|
|New York City, NY|      APPLE| 827.4691275372936|  15|
|New York City, NY|      APPLE| 611.1671053338239|  16|
|      Seattle, WA|      APPLE| 791.2356891378341|  13|
|      Seattle, WA|      APPLE| 862.4576849245573|  14|
|      Seattle, WA|      APPLE| 939.322202843292