In [1]:
import os
import json
import random
import string
import shutil
import unittest
from time import time
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import MapType
from pyspark.sql.functions import udf, col, array, lit

In [2]:
spark = SparkSession.builder.master("local[2]").getOrCreate()

## TASK 1: Build Purchases Attribution Projection

### Loading data

In [3]:
spark

In [5]:
clickstream_df = spark.read.csv(sep=r'\t', path='input_csv_datasets/mobile-app-clickstream_sample.tsv', header=True)
clickstream_df.createOrReplaceTempView('clickstream')
clickstream_df = spark.sql("SELECT * FROM clickstream "
                           "WHERE eventType = 'purchase' or eventType = 'app_open' or eventType = 'app_close' "
                           "ORDER BY userId, eventTime")
clickstream_df.createOrReplaceTempView('clickstream')

purchases_df = spark.read.csv(path='input_csv_datasets/purchases_sample.csv', header=True)

In [6]:
clickstream_df.createOrReplaceTempView('clickstream')
purchases_df.createOrReplaceTempView('purchases')

### Task 3.1 Load from parquet

In [35]:
if os.path.exists('input_dataset/'):
    shutil.rmtree('input_dataset/')
    
clickstream_df.write.partitionBy('eventTime').parquet('input_dataset/clickstream.parquet')
purchases_df.write.partitionBy('purchaseTime').parquet('input_dataset/purchases.parquet')

clickstream_df = spark.read.parquet('input_dataset/clickstream.parquet')
purchases_df = spark.read.parquet('input_dataset/purchases.parquet')

### Defining UDFs for parsing the attributes column.

In [10]:
def get_campaign_id(row):
    try:
        row_json = json.loads(row.replace("{{", '{').replace("}}", '}'))
        if 'campaign_id' in row_json:
            return row_json['campaign_id']
        else:
            return None
    except:
        return None


def get_channel_id(row):
    try:
        row_json = json.loads(row.replace("{{", '{').replace("}}", '}'))
        if 'channel_id' in row_json:
            return row_json['channel_id']
        else:
            return None
    except:
        return None
    
def get_purchase_id(row):
    try:
        row_json = json.loads(row.replace("{{", '{').replace("}}", '}'))
        if 'purchase_id' in row_json:
            return row_json['purchase_id']
        else:
            return None
    except:
        return None

### Parsing the attributes column to 3 separate columns named `campaignId`, `channelId` and `purchaseId`

In [8]:
def parse_attributes(df):
    get_purchase_id_udf = udf(lambda x: get_purchase_id(x))
    get_channel_id_udf = udf(lambda x: get_channel_id(x))
    get_campaign_id_udf = udf(lambda x: get_campaign_id(x))
    
    df = (df.withColumn('campaignId', get_campaign_id_udf(col('attributes')))
                      .withColumn('channelId', get_channel_id_udf(col('attributes')))
                      .withColumn('purchaseId', get_purchase_id_udf(col('attributes')))
                      .drop(col('attributes')))
    df.createOrReplaceTempView('clickstream')
    return df

In [9]:
clickstream_df = parse_attributes(clickstream_df)
clickstream_df.show()

+------+-------+-------------------+---------+----------+------------+----------+
|userId|eventId|          eventTime|eventType|campaignId|   channelId|purchaseId|
+------+-------+-------------------+---------+----------+------------+----------+
|    u1|  u1_e1| 2019-01-01 0:00:00| app_open|      cmp1|  Google Ads|      null|
|    u1|  u1_e6| 2019-01-01 0:01:00| purchase|      null|        null|        p1|
|    u1|  u1_e7| 2019-01-01 0:02:00|app_close|      null|        null|      null|
|    u2|  u2_e1| 2019-01-01 0:00:00| app_open|      cmp1|  Yandex Ads|      null|
|    u2|  u2_e4| 2019-01-01 0:03:00| purchase|      null|        null|        p2|
|    u2|  u2_e5| 2019-01-01 0:04:00|app_close|      null|        null|      null|
|    u2|  u2_e6| 2019-01-02 0:00:00| app_open|      cmp2|  Yandex Ads|      null|
|    u2| u2_e10| 2019-01-02 0:04:00|app_close|      null|        null|      null|
|    u3|  u3_e1| 2019-01-01 0:00:00| app_open|      cmp2|Facebook Ads|      null|
|    u3|  u3_e4|

### Prepare session dataframe

In [11]:
def get_session_df():
    sessions_df = spark.sql("SELECT userId, eventId, eventType, eventTime, campaignId, channelId "
                            "FROM clickstream "
                            "WHERE eventType = 'app_open' or eventType = 'app_close'")
    sessions_df.createOrReplaceTempView('sessions')
    
    sessions_df = spark.sql("""SELECT * FROM 
                        (SELECT userId, eventTime as start_session, 
                        LEAD (eventTime, 1) OVER (PARTITION BY userId ORDER BY eventTime) as end_session, 
                        eventType, campaignId, channelId, eventId as session_id
                        FROM sessions ORDER BY userId, start_session) 
                        WHERE eventType = 'app_open';""")
    sessions_df.createOrReplaceTempView('sessions')
    return sessions_df

In [12]:
get_session_df()
spark.sql("SELECT * FROM sessions").show()

+------+-------------------+-------------------+---------+----------+------------+----------+
|userId|      start_session|        end_session|eventType|campaignId|   channelId|session_id|
+------+-------------------+-------------------+---------+----------+------------+----------+
|    u1| 2019-01-01 0:00:00| 2019-01-01 0:02:00| app_open|      cmp1|  Google Ads|     u1_e1|
|    u2| 2019-01-01 0:00:00| 2019-01-01 0:04:00| app_open|      cmp1|  Yandex Ads|     u2_e1|
|    u2| 2019-01-02 0:00:00| 2019-01-02 0:04:00| app_open|      cmp2|  Yandex Ads|     u2_e6|
|    u3| 2019-01-01 0:00:00| 2019-01-01 0:02:00| app_open|      cmp2|Facebook Ads|     u3_e1|
|    u3| 2019-01-01 1:11:11| 2019-01-01 1:12:30| app_open|      cmp1|  Google Ads|     u3_e5|
|    u3|2019-01-02 13:00:10|2019-01-02 13:06:00| app_open|      cmp2|  Yandex Ads|    u3_e19|
|    u3| 2019-01-02 2:00:00| 2019-01-02 2:15:40| app_open|      cmp2|  Yandex Ads|    u3_e10|
+------+-------------------+-------------------+---------+--

### Grep only purchases from clickstream.

In [13]:
def provide_sessions_for_purchases():
    df = spark.sql("SELECT * FROM clickstream WHERE eventType = 'purchase'")
    df.createOrReplaceTempView('clickstream_purchases')
    clicks_sessions_df = spark.sql("SELECT clicks.userId, clicks.eventId, sessions.campaignId, sessions.channelId, clicks.purchaseId, sessions.session_id "
                                   "FROM clickstream_purchases clicks JOIN sessions "
                                   "ON (clicks.userId = sessions.userId AND sessions.start_session < clicks.eventTime AND sessions.end_session > clicks.eventTime) "
                                   "ORDER BY userId")
    clicks_sessions_df.createOrReplaceTempView('clicks_sessions')
    return clicks_sessions_df

### Provide session for purchases

In [14]:
provide_sessions_for_purchases()
spark.sql("SELECT * FROM clicks_sessions").show()

+------+-------+----------+----------+----------+----------+
|userId|eventId|campaignId| channelId|purchaseId|session_id|
+------+-------+----------+----------+----------+----------+
|    u1|  u1_e6|      cmp1|Google Ads|        p1|     u1_e1|
|    u2|  u2_e4|      cmp1|Yandex Ads|        p2|     u2_e1|
|    u3|  u3_e8|      cmp1|Google Ads|        p3|     u3_e5|
|    u3| u3_e17|      cmp2|Yandex Ads|        p5|    u3_e10|
|    u3| u3_e22|      cmp2|Yandex Ads|        p6|    u3_e19|
|    u3| u3_e14|      cmp2|Yandex Ads|        p4|    u3_e10|
+------+-------+----------+----------+----------+----------+



### Build projection

In [15]:
projection = spark.sql("SELECT purchases.*, clicks_sessions.campaignId, clicks_sessions.channelId, clicks_sessions.session_id "
                       "FROM clicks_sessions JOIN purchases ON (clicks_sessions.purchaseId = purchases.purchaseId)")

projection.createOrReplaceTempView('projection')
projection.show()

+----------+-------------------+-----------+-----------+----------+----------+----------+
|purchaseId|       purchaseTime|billingCost|isConfirmed|campaignId| channelId|session_id|
+----------+-------------------+-----------+-----------+----------+----------+----------+
|        p3| 2019-01-01 1:12:15|        300|      FALSE|      cmp1|Google Ads|     u3_e5|
|        p6|2019-01-02 13:03:00|         99|      FALSE|      cmp2|Yandex Ads|    u3_e19|
|        p5| 2019-01-01 2:15:05|         75|       TRUE|      cmp2|Yandex Ads|    u3_e10|
|        p4| 2019-01-01 2:13:05|       50.2|       TRUE|      cmp2|Yandex Ads|    u3_e10|
|        p1| 2019-01-01 0:01:05|      100.5|       TRUE|      cmp1|Google Ads|     u1_e1|
|        p2| 2019-01-01 0:03:10|        200|       TRUE|      cmp1|Yandex Ads|     u2_e1|
+----------+-------------------+-----------+-----------+----------+----------+----------+



### Task 3.1: . Save output for Task #1 as parquet as well.

In [16]:
def save_as_parquet(df, task, name):
    if os.path.exists(task):
        shutil.rmtree(task)
    
    projection.write.parquet(task + name)

In [17]:
save_as_parquet(projection, 'task1/', 'projection.parquet')

## Task 2: Calculate Marketing Campaigns And Channels Statistics 

### 2.1 What are the Top 10 marketing campaigns that bring the biggest revenue (based on billingCost of confirmed purchases)?

In [18]:
task2_1_df = spark.sql("SELECT campaignId, sum(billingCost) as revenue "
          "FROM projection "
          "WHERE isConfirmed = 'TRUE' "
          "GROUP BY campaignId "
          "ORDER BY revenue DESC LIMIT 10;")

save_as_parquet(task2_1_df, 'task2_1/', 'top_campaign.parquet')
task2_1_df.show()

+----------+-------+
|campaignId|revenue|
+----------+-------+
|      cmp1|  300.5|
|      cmp2|  125.2|
+----------+-------+



### 2.2 What is the most popular (i.e. Top) channel that drives the highest amount of unique sessions (engagements)  with the App in each campaign?

In [19]:
task2_2_df = spark.sql("SELECT channelId, campaignId, count(distinct session_id) as amount_of_unique_session "
          "FROM projection "
          "GROUP BY channelId, campaignId "
          "ORDER BY amount_of_unique_session DESC")
save_as_parquet(task2_2_df, 'task2_2/', 'top_channel.parquet')
task2_2_df.show()

+----------+----------+------------------------+
| channelId|campaignId|amount_of_unique_session|
+----------+----------+------------------------+
|Yandex Ads|      cmp2|                       2|
|Google Ads|      cmp1|                       2|
|Yandex Ads|      cmp1|                       1|
+----------+----------+------------------------+



### Task 3.2: Calculate metrics from Task #2 for different time periods: September 2020, 2020-11-11
 

In [20]:
date = '2019-01'
spark.sql("SELECT campaignId, sum(billingCost) as revenue "
          "FROM projection "
          f"WHERE isConfirmed = 'TRUE' AND purchaseTime like '%{date}%' "
          "GROUP BY campaignId ORDER BY revenue DESC;").show()

+----------+-------+
|campaignId|revenue|
+----------+-------+
|      cmp1|  300.5|
|      cmp2|  125.2|
+----------+-------+



### Saving query plan

In [21]:
date = '2019-01'
task3_plan = spark.sql("SELECT campaignId, sum(billingCost) as revenue "
                       "FROM projection "
                       f"WHERE isConfirmed = 'TRUE' AND purchaseTime like '%{date}%' "
                       "GROUP BY campaignId ORDER BY revenue DESC LIMIT 10;")._jdf.queryExecution().toString()

In [22]:
with open('task3_plan.MD', 'w+') as f:
    f.write(task3_plan)

In [25]:
def compare_parquet_and_csv_performance():
    start = time()
    clickstream_df = spark.read.csv(sep=r'\t', path='input_csv_datasets/mobile-app-clickstream_sample.tsv', header=True)
    print(f"Reading from CSV:\t{time() - start}")
    
    if os.path.exists('input_dataset/'):
        shutil.rmtree('input_dataset/')
    clickstream_df.write.partitionBy('eventTime').parquet('input_dataset/clickstream.parquet')
    
    start = time()
    spark.read.parquet('input_dataset/clickstream.parquet')
    print(f"Reading from Parquet:\t{time() - start}")

In [26]:
compare_parquet_and_csv_performance()

Reading from CSV:	0.1139211654663086
Reading from Parquet:	0.49912595748901367
