# Capstone Project - Solution

### 1. Create Spark Session

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, sum, desc, count, countDistinct, isnan
f"pyspark version={pyspark.__version__}"

'pyspark version=3.2.1'

In [3]:
spark = SparkSession.builder \
    .master('spark://ThinkPad-X1:7077') \
    .appName('spark-local-cluster') \
    .getOrCreate() 
sc = spark.sparkContext

In [4]:
pwd = !pwd
pwd = pwd[0]

In [5]:
print(pwd)

/home/wojtek/Workspace/max-ml/big-data


### 2. Load sample of generated data and verify the content

In [6]:
# Load the first file of user_purchases
p0 = spark.read.csv(f'file://{pwd}/generated-data/user_purchases/user_purchases_0.csv.gz', header=True, inferSchema=True)

                                                                                

In [7]:
p0.show(5)

+--------------------+-------------------+-----------+-----------+
|          purchaseId|       purchaseTime|billingCost|isConfirmed|
+--------------------+-------------------+-----------+-----------+
|e3e70682-c209-4ca...|2020-12-13 16:00:20|     845.98|      false|
|82e2e662-f728-44f...|2020-12-26 08:35:13|     760.37|      false|
|d4713d60-c8a7-463...|2020-11-20 21:32:23|     426.37|       true|
|9558867f-5ba9-4fa...|2020-11-04 10:01:00|     266.33|      false|
|23c6612f-4826-467...|2020-10-11 09:56:36|     516.16|      false|
+--------------------+-------------------+-----------+-----------+
only showing top 5 rows



In [8]:
p0.printSchema()

root
 |-- purchaseId: string (nullable = true)
 |-- purchaseTime: string (nullable = true)
 |-- billingCost: double (nullable = true)
 |-- isConfirmed: boolean (nullable = true)



In [9]:
p0.count()

                                                                                

10000

In [10]:
# Similairly load only first file of 'mobile_clickstream'
c0 = spark.read.csv(f'file://{pwd}/generated-data/mobile_app_clickstream/mobile_app_clickstream_0.csv.gz', header=True, inferSchema=True)

                                                                                

In [11]:
c0.show(5)

+--------------------+--------------------+--------------------+-------------------+--------------------+
|              userId|             eventId|           eventType|          eventTime|          attributes|
+--------------------+--------------------+--------------------+-------------------+--------------------+
|f6e8252f-b5cc-48a...|3fce7a72-6aa5-4a8...|            app_open|2020-12-13 15:37:31|{'campaign_id': 3...|
|f6e8252f-b5cc-48a...|b49802f8-62a6-424...|      search_product|2020-12-13 15:45:34|                null|
|f6e8252f-b5cc-48a...|08b51fee-d62d-4b2...|view_product_details|2020-12-13 15:48:07|                null|
|f6e8252f-b5cc-48a...|bdde7a28-c32d-477...|            purchase|2020-12-13 16:00:20|{'purchase_id': '...|
|f6e8252f-b5cc-48a...|be7741d8-56c9-47c...|           app_close|2020-12-13 16:04:20|                null|
+--------------------+--------------------+--------------------+-------------------+--------------------+
only showing top 5 rows



In [12]:
c0.printSchema()

root
 |-- userId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventType: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- attributes: string (nullable = true)



In [13]:
c0.count()

50000

### 3. Load all data

In [14]:
from pyspark.sql.dataframe import DataFrame

In [15]:
user_purchases_df : DataFrame = None
for i in range(50): 
    file = f'file://{pwd}/generated-data/user_purchases/user_purchases_{i}.csv.gz'
    batch = spark.read.csv(file, header=True, inferSchema=True)
    if user_purchases_df == None:
        user_purchases_df = batch
    else:
        user_purchases_df = user_purchases_df.union(batch) 
        

In [16]:
user_purchases_df.count()

500000

In [17]:
clickstream_df : DataFrame = None
for i in range(50): 
    file = f'file://{pwd}/generated-data/mobile_app_clickstream/mobile_app_clickstream_{i}.csv.gz'
    batch = spark.read.csv(file, header=True, inferSchema=True)
    if clickstream_df == None:
        clickstream_df = batch
    else:
        clickstream_df = clickstream_df.union(batch) 
        

In [18]:
clickstream_df.count()

                                                                                

2500000

### 4. Schema improvements

In [19]:
user_purchases_df = user_purchases_df.withColumn('purchaseTime', user_purchases_df['purchaseTime'].cast(TimestampType()))
user_purchases_df.printSchema()      

root
 |-- purchaseId: string (nullable = true)
 |-- purchaseTime: timestamp (nullable = true)
 |-- billingCost: string (nullable = true)
 |-- isConfirmed: boolean (nullable = true)



In [20]:
# just to check if there are missing values 
user_purchases_df.select('*').where((isnan(col('billingCost'))) | (col('billingCost').isNull()) ).count() 

                                                                                

0

In [21]:
user_purchases_df.select('*').where((isnan(col('billingCost'))) | (col('billingCost').isNull()) ).show()

+----------+------------+-----------+-----------+
|purchaseId|purchaseTime|billingCost|isConfirmed|
+----------+------------+-----------+-----------+
+----------+------------+-----------+-----------+



In [22]:
# sometimes this results in a single record but usually the output is empty ??!!

In [23]:
user_purchases_df = user_purchases_df.withColumn('billingCost', user_purchases_df['billingCost'].cast(DoubleType()))
user_purchases_df.printSchema()    

root
 |-- purchaseId: string (nullable = true)
 |-- purchaseTime: timestamp (nullable = true)
 |-- billingCost: double (nullable = true)
 |-- isConfirmed: boolean (nullable = true)



In [24]:
user_purchases_df.show(5)

+--------------------+-------------------+-----------+-----------+
|          purchaseId|       purchaseTime|billingCost|isConfirmed|
+--------------------+-------------------+-----------+-----------+
|e3e70682-c209-4ca...|2020-12-13 16:00:20|     845.98|      false|
|82e2e662-f728-44f...|2020-12-26 08:35:13|     760.37|      false|
|d4713d60-c8a7-463...|2020-11-20 21:32:23|     426.37|       true|
|9558867f-5ba9-4fa...|2020-11-04 10:01:00|     266.33|      false|
|23c6612f-4826-467...|2020-10-11 09:56:36|     516.16|      false|
+--------------------+-------------------+-----------+-----------+
only showing top 5 rows



In [25]:
clickstream_df = clickstream_df.withColumn('eventTime', clickstream_df['eventTime'].cast(TimestampType()))
clickstream_df.printSchema() 

root
 |-- userId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventType: string (nullable = true)
 |-- eventTime: timestamp (nullable = true)
 |-- attributes: string (nullable = true)



### 5. Convert 'attributes' column from String into Map

In [26]:
import json

In [27]:
def convert_to_map(input : str) -> dict :
    # print(f">>>F0={input}")
    if (input != None):
        input = input.replace("'",'"')
        return json.loads(input)
    else:
        # print(f">>>F3={input}")
        return input 
 
convert_to_map_udf = udf(lambda x:convert_to_map(x), MapType(StringType(), StringType()))

In [28]:
c0.show(5)

+--------------------+--------------------+--------------------+-------------------+--------------------+
|              userId|             eventId|           eventType|          eventTime|          attributes|
+--------------------+--------------------+--------------------+-------------------+--------------------+
|f6e8252f-b5cc-48a...|3fce7a72-6aa5-4a8...|            app_open|2020-12-13 15:37:31|{'campaign_id': 3...|
|f6e8252f-b5cc-48a...|b49802f8-62a6-424...|      search_product|2020-12-13 15:45:34|                null|
|f6e8252f-b5cc-48a...|08b51fee-d62d-4b2...|view_product_details|2020-12-13 15:48:07|                null|
|f6e8252f-b5cc-48a...|bdde7a28-c32d-477...|            purchase|2020-12-13 16:00:20|{'purchase_id': '...|
|f6e8252f-b5cc-48a...|be7741d8-56c9-47c...|           app_close|2020-12-13 16:04:20|                null|
+--------------------+--------------------+--------------------+-------------------+--------------------+
only showing top 5 rows



In [29]:
c0_map = c0.withColumn('attributes', convert_to_map_udf(col('attributes')))
c0_map.printSchema()

root
 |-- userId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventType: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- attributes: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [30]:
c0_map.show(5)

+--------------------+--------------------+--------------------+-------------------+--------------------+
|              userId|             eventId|           eventType|          eventTime|          attributes|
+--------------------+--------------------+--------------------+-------------------+--------------------+
|f6e8252f-b5cc-48a...|3fce7a72-6aa5-4a8...|            app_open|2020-12-13 15:37:31|{channel_id -> Fa...|
|f6e8252f-b5cc-48a...|b49802f8-62a6-424...|      search_product|2020-12-13 15:45:34|                null|
|f6e8252f-b5cc-48a...|08b51fee-d62d-4b2...|view_product_details|2020-12-13 15:48:07|                null|
|f6e8252f-b5cc-48a...|bdde7a28-c32d-477...|            purchase|2020-12-13 16:00:20|{purchase_id -> e...|
|f6e8252f-b5cc-48a...|be7741d8-56c9-47c...|           app_close|2020-12-13 16:04:20|                null|
+--------------------+--------------------+--------------------+-------------------+--------------------+
only showing top 5 rows



In [31]:
clickstream_map_df = clickstream_df.withColumn('attributes', convert_to_map_udf(col('attributes')))
clickstream_map_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventType: string (nullable = true)
 |-- eventTime: timestamp (nullable = true)
 |-- attributes: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [32]:
clickstream_map_df.show(5)

+--------------------+--------------------+--------------------+-------------------+--------------------+
|              userId|             eventId|           eventType|          eventTime|          attributes|
+--------------------+--------------------+--------------------+-------------------+--------------------+
|f6e8252f-b5cc-48a...|3fce7a72-6aa5-4a8...|            app_open|2020-12-13 15:37:31|{channel_id -> Fa...|
|f6e8252f-b5cc-48a...|b49802f8-62a6-424...|      search_product|2020-12-13 15:45:34|                null|
|f6e8252f-b5cc-48a...|08b51fee-d62d-4b2...|view_product_details|2020-12-13 15:48:07|                null|
|f6e8252f-b5cc-48a...|bdde7a28-c32d-477...|            purchase|2020-12-13 16:00:20|{purchase_id -> e...|
|f6e8252f-b5cc-48a...|be7741d8-56c9-47c...|           app_close|2020-12-13 16:04:20|                null|
+--------------------+--------------------+--------------------+-------------------+--------------------+
only showing top 5 rows



#### Optimizing the dataframes for further processing

In [33]:
clickstream_map_df.rdd.getNumPartitions()

50

In [34]:
clickstream_repartitioned = clickstream_map_df.repartition(64, col('userId'))

In [35]:
clickstream_repartitioned.rdd.getNumPartitions()



64

### 6. Identify events that belong to the same session

In [36]:
clickstream_repartitioned = clickstream_repartitioned.sortWithinPartitions(['userId', 'eventTime'], ascending=True) 
clickstream_repartitioned.show(10)



+--------------------+--------------------+--------------------+-------------------+--------------------+
|              userId|             eventId|           eventType|          eventTime|          attributes|
+--------------------+--------------------+--------------------+-------------------+--------------------+
|000ea2e3-ae19-4f8...|34e0b429-b0d7-42a...|            app_open|2020-12-18 17:51:59|{channel_id -> Go...|
|000ea2e3-ae19-4f8...|a27ee032-d1bd-453...|      search_product|2020-12-18 18:00:33|                null|
|000ea2e3-ae19-4f8...|14dc3cd7-bc5c-404...|view_product_details|2020-12-18 18:03:28|                null|
|000ea2e3-ae19-4f8...|7339a8e5-392c-486...|            purchase|2020-12-18 18:08:52|{purchase_id -> 3...|
|000ea2e3-ae19-4f8...|2a1cb948-1965-4b9...|           app_close|2020-12-18 18:10:52|                null|
|000ee35f-5776-401...|7713dc7b-3e6a-453...|            app_open|2020-11-17 23:42:32|{channel_id -> Go...|
|000ee35f-5776-401...|f668579f-e2b5-4fc...|   

                                                                                

In [37]:
from pyspark.sql.functions import spark_partition_id
clickstream_repartitioned = clickstream_repartitioned.withColumn('partitionId', spark_partition_id())
clickstream_repartitioned.show(10)



+--------------------+--------------------+--------------------+-------------------+--------------------+-----------+
|              userId|             eventId|           eventType|          eventTime|          attributes|partitionId|
+--------------------+--------------------+--------------------+-------------------+--------------------+-----------+
|000ea2e3-ae19-4f8...|34e0b429-b0d7-42a...|            app_open|2020-12-18 17:51:59|{channel_id -> Go...|          0|
|000ea2e3-ae19-4f8...|a27ee032-d1bd-453...|      search_product|2020-12-18 18:00:33|                null|          0|
|000ea2e3-ae19-4f8...|14dc3cd7-bc5c-404...|view_product_details|2020-12-18 18:03:28|                null|          0|
|000ea2e3-ae19-4f8...|7339a8e5-392c-486...|            purchase|2020-12-18 18:08:52|{purchase_id -> 3...|          0|
|000ea2e3-ae19-4f8...|2a1cb948-1965-4b9...|           app_close|2020-12-18 18:10:52|                null|          0|
|000ee35f-5776-401...|7713dc7b-3e6a-453...|            a

                                                                                

In [38]:
partition_summary = clickstream_repartitioned.groupby('partitionId').agg(count('*').alias('records')).orderBy('partitionId')
partition_summary.show()



+-----------+-------+
|partitionId|records|
+-----------+-------+
|          0|  38835|
|          1|  38925|
|          2|  38950|
|          3|  39260|
|          4|  39065|
|          5|  39525|
|          6|  38455|
|          7|  40080|
|          8|  38195|
|          9|  39820|
|         10|  38175|
|         11|  39530|
|         12|  39040|
|         13|  39625|
|         14|  38295|
|         15|  39090|
|         16|  38425|
|         17|  39500|
|         18|  39400|
|         19|  38985|
+-----------+-------+
only showing top 20 rows



                                                                                

In [39]:
# count different events
event_summary = clickstream_repartitioned.groupby('eventType').agg(count('*').alias('events')).orderBy('events')
event_summary.show()



+--------------------+------+
|           eventType|events|
+--------------------+------+
|view_product_details|500000|
|            app_open|500000|
|            purchase|500000|
|           app_close|500000|
|      search_product|500000|
+--------------------+------+



                                                                                

In [41]:
last_session = 0
last_user_id = ""
# this method assumes that record are ordered by [userId, eventTime], and each app_open event is followed by app_close.
def get_session(user_id : str, event_type: str, partition_id : int) -> str :
    global last_session
    global last_user_id
    
    if event_type == 'app_open' : # starts new session
        last_session = last_session + 1
 
    return str(partition_id) + '_' + str(last_session)

In [42]:
get_session_udf = udf(lambda user, event_type, partition_id: get_session(user, event_type, partition_id), StringType())

In [44]:
clickstream_session_df = clickstream_repartitioned.withColumn('sessionId', get_session_udf(col('userId'), col('eventType'), col('partitionId')))
clickstream_session_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventType: string (nullable = true)
 |-- eventTime: timestamp (nullable = true)
 |-- attributes: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- partitionId: integer (nullable = false)
 |-- sessionId: string (nullable = true)



In [45]:
clickstream_session_df.show(10)



+--------------------+--------------------+--------------------+-------------------+--------------------+-----------+---------+
|              userId|             eventId|           eventType|          eventTime|          attributes|partitionId|sessionId|
+--------------------+--------------------+--------------------+-------------------+--------------------+-----------+---------+
|000ea2e3-ae19-4f8...|34e0b429-b0d7-42a...|            app_open|2020-12-18 17:51:59|{channel_id -> Go...|          0|      0_1|
|000ea2e3-ae19-4f8...|a27ee032-d1bd-453...|      search_product|2020-12-18 18:00:33|                null|          0|      0_1|
|000ea2e3-ae19-4f8...|14dc3cd7-bc5c-404...|view_product_details|2020-12-18 18:03:28|                null|          0|      0_1|
|000ea2e3-ae19-4f8...|7339a8e5-392c-486...|            purchase|2020-12-18 18:08:52|{purchase_id -> 3...|          0|      0_1|
|000ea2e3-ae19-4f8...|2a1cb948-1965-4b9...|           app_close|2020-12-18 18:10:52|                null

                                                                                

In [46]:
clickstream_session_df = clickstream_session_df.cache()

### 7. Join 'app_open' events with 'purchase' events

In [47]:
purchase_session_df = clickstream_session_df.alias('purchase') \
    .withColumnRenamed('eventType', 'purchaseEventType') \
    .withColumnRenamed('attributes', 'purchaseAttributes') \
    .join(clickstream_session_df.alias('open')) \
    .withColumnRenamed('attributes', 'openAttributes') \
    .where( \
        (col('purchaseEventType') == 'purchase') & \
        (col('open.eventType') == 'app_open') & \
        (col('purchase.userId') == col('open.userId')) & \
        (col('purchase.sessionId') == col('open.sessionId'))) \
    .select('purchase.*', 'purchaseEventType', 'purchaseAttributes' ,'openAttributes') 


purchase_session_df.show()



+--------------------+--------------------+-------------------+-----------+---------+-----------------+--------------------+--------------------+
|              userId|             eventId|          eventTime|partitionId|sessionId|purchaseEventType|  purchaseAttributes|      openAttributes|
+--------------------+--------------------+-------------------+-----------+---------+-----------------+--------------------+--------------------+
|0000f82d-9bbb-46d...|41c3d6fb-ebb8-449...|2020-11-11 08:49:36|         43|     43_1|         purchase|{purchase_id -> 4...|{channel_id -> Fa...|
|0001fca6-a926-468...|2dc6c0d8-0293-489...|2020-09-28 04:04:57|         13|     13_1|         purchase|{purchase_id -> a...|{channel_id -> Tw...|
|00047728-5b35-46d...|1bb6ddd1-c50c-4dd...|2020-11-03 07:46:25|         42|     42_1|         purchase|{purchase_id -> a...|{channel_id -> Go...|
|00058134-5c08-4e4...|94f72f17-19c2-473...|2020-09-30 21:12:22|         39|     39_1|         purchase|{purchase_id -> 5...|

                                                                                

In [48]:
purchase_session_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventTime: timestamp (nullable = true)
 |-- partitionId: integer (nullable = false)
 |-- sessionId: string (nullable = true)
 |-- purchaseEventType: string (nullable = true)
 |-- purchaseAttributes: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- openAttributes: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



### 8. Extract attributes from the maps into a separate columns

In [49]:
purchase_session_df = purchase_session_df.withColumn('purchase_id', purchase_session_df['purchaseAttributes']['purchase_id'])
purchase_session_df = purchase_session_df \
    .withColumn('channelId', purchase_session_df['openAttributes']['channel_id']) \
    .withColumn('campaignId', purchase_session_df['openAttributes']['campaign_id'])
purchase_session_df.show(10)



+--------------------+--------------------+-------------------+-----------+---------+-----------------+--------------------+--------------------+--------------------+------------+----------+
|              userId|             eventId|          eventTime|partitionId|sessionId|purchaseEventType|  purchaseAttributes|      openAttributes|         purchase_id|   channelId|campaignId|
+--------------------+--------------------+-------------------+-----------+---------+-----------------+--------------------+--------------------+--------------------+------------+----------+
|0000f82d-9bbb-46d...|41c3d6fb-ebb8-449...|2020-11-11 08:49:36|         43|     43_1|         purchase|{purchase_id -> 4...|{channel_id -> Fa...|472154b7-ccda-477...|Facebook Ads|       472|
|0001fca6-a926-468...|2dc6c0d8-0293-489...|2020-09-28 04:04:57|         13|     13_1|         purchase|{purchase_id -> a...|{channel_id -> Tw...|a7ad667b-c199-45d...| Twitter Ads|       269|
|00047728-5b35-46d...|1bb6ddd1-c50c-4dd...|20

                                                                                

In [50]:
# Drop columns with map attributes
purchase_session_df = purchase_session_df.drop('purchaseEventType', 'openAttributes', 'purchaseAttributes')
purchase_session_df.show(10)



+--------------------+--------------------+-------------------+-----------+---------+--------------------+------------+----------+
|              userId|             eventId|          eventTime|partitionId|sessionId|         purchase_id|   channelId|campaignId|
+--------------------+--------------------+-------------------+-----------+---------+--------------------+------------+----------+
|0000f82d-9bbb-46d...|41c3d6fb-ebb8-449...|2020-11-11 08:49:36|         43|     43_1|472154b7-ccda-477...|Facebook Ads|       472|
|0001fca6-a926-468...|2dc6c0d8-0293-489...|2020-09-28 04:04:57|         13|     13_1|a7ad667b-c199-45d...| Twitter Ads|       269|
|00047728-5b35-46d...|1bb6ddd1-c50c-4dd...|2020-11-03 07:46:25|         42|     42_1|a10a8877-b782-48e...|  Google Ads|       628|
|00058134-5c08-4e4...|94f72f17-19c2-473...|2020-09-30 21:12:22|         39|     39_1|552f815b-1208-407...|  Google Ads|       257|
|0006d62d-2a59-4aa...|4f8b6be2-098b-44a...|2020-10-22 04:06:34|          8|      8_

                                                                                

### 9. Join with purchase table

In [52]:
purchase_projection_df = purchase_session_df.join(user_purchases_df, purchase_session_df.purchase_id == user_purchases_df.purchaseId)
purchase_projection_df.show(10)



+--------------------+--------------------+-------------------+-----------+---------+--------------------+------------+----------+--------------------+-------------------+-----------+-----------+
|              userId|             eventId|          eventTime|partitionId|sessionId|         purchase_id|   channelId|campaignId|          purchaseId|       purchaseTime|billingCost|isConfirmed|
+--------------------+--------------------+-------------------+-----------+---------+--------------------+------------+----------+--------------------+-------------------+-----------+-----------+
|9a89a902-ebce-417...|e171ce5d-6360-400...|2020-10-27 21:49:04|         60|  60_4760|0000779f-cbf2-46e...|  Yandex Ads|       781|0000779f-cbf2-46e...|2020-10-27 21:49:04|     871.51|       true|
|b8b3b7c7-f75e-47e...|a61841be-9b76-4f3...|2020-11-10 22:40:38|         21|  21_5663|0000b4d0-364c-4dd...|      VK Ads|       446|0000b4d0-364c-4dd...|2020-11-10 22:40:38|     801.85|       true|
|d4b22dd9-fcaf-4d3..

                                                                                

### 10. Analysis - calculate final aggregates

#### Top Campaigns by billing costs

In [54]:
agg_df = purchase_projection_df \
    .where(col('isConfirmed') == True) \
    .groupby('campaignId') \
    .agg(sum('billingCost').alias('total_cost')) \
    .sort(desc('total_cost'))

agg_df.show(10)



+----------+------------------+
|campaignId|        total_cost|
+----------+------------------+
|       190|2041060.8399999996|
|       528|        1510378.16|
|       325|1470622.3200000003|
|       585|        1047838.02|
|       779|1031561.1400000004|
|       650|        1031403.13|
|       859|1024151.6599999998|
|       610|1020675.5800000001|
|       669|1019272.5700000001|
|       461|        1018971.02|
+----------+------------------+
only showing top 10 rows



                                                                                

#### Unique sessions by channel

In [55]:
agg_df2 = purchase_projection_df \
    .where(col('isConfirmed') == True) \
    .groupby('channelId') \
    .agg(countDistinct('sessionId').alias('unique_sessions')) \
    .sort(desc('unique_sessions'))

agg_df2.show()



+------------+---------------+
|   channelId|unique_sessions|
+------------+---------------+
| Twitter Ads|          50356|
|  Google Ads|          50122|
|      VK Ads|          50008|
|Facebook Ads|          49973|
|  Yandex Ads|          49751|
+------------+---------------+



                                                                                