In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import pandas as pd

In [4]:
spark=SparkSession.builder \
        .appName('MySparkApp')\
        .master('local[*]')\
        .getOrCreate()
spark.version


'4.0.1'

In [18]:
data = {
    'user_id': [101, 101, 102, 102, 101, 104, 103],
    'session_start': [
        '2023-11-06 13:53:42',
        '2023-11-22 16:45:21',
        '2023-11-16 13:23:09',
        '2023-11-17 13:23:09',
        '2023-11-20 07:16:06',
        '2023-11-27 03:10:49',
        '2023-11-27 03:10:49'
    ],
    'session_end': [
        '2023-11-06 14:05:42',
        '2023-11-22 20:39:21',
        '2023-11-16 16:10:09',
        '2023-11-17 16:10:09',
        '2023-11-20 08:33:06',
        '2023-11-27 03:30:49',
        '2023-11-27 03:30:49'
    ],
    'session_id': [375, 594, 777, 778, 315, 797, 798],
    'session_type': ['Viewer', 'Streamer', 'Streamer', 'Streamer', 'Streamer', 'Viewer', 'Streamer']
}

In [26]:
pdf = pd.DataFrame(data)
sessions=spark.createDataFrame(pdf)
sessions.show()

+-------+-------------------+-------------------+----------+------------+
|user_id|      session_start|        session_end|session_id|session_type|
+-------+-------------------+-------------------+----------+------------+
|    101|2023-11-06 13:53:42|2023-11-06 14:05:42|       375|      Viewer|
|    101|2023-11-22 16:45:21|2023-11-22 20:39:21|       594|    Streamer|
|    102|2023-11-16 13:23:09|2023-11-16 16:10:09|       777|    Streamer|
|    102|2023-11-17 13:23:09|2023-11-17 16:10:09|       778|    Streamer|
|    101|2023-11-20 07:16:06|2023-11-20 08:33:06|       315|    Streamer|
|    104|2023-11-27 03:10:49|2023-11-27 03:30:49|       797|      Viewer|
|    103|2023-11-27 03:10:49|2023-11-27 03:30:49|       798|    Streamer|
+-------+-------------------+-------------------+----------+------------+



In [27]:
w=Window.partitionBy('user_id').orderBy('session_start')
sessions=sessions.withColumn('first_value',first('session_type').over(w))
sessions.show()

+-------+-------------------+-------------------+----------+------------+-----------+
|user_id|      session_start|        session_end|session_id|session_type|first_value|
+-------+-------------------+-------------------+----------+------------+-----------+
|    101|2023-11-06 13:53:42|2023-11-06 14:05:42|       375|      Viewer|     Viewer|
|    101|2023-11-20 07:16:06|2023-11-20 08:33:06|       315|    Streamer|     Viewer|
|    101|2023-11-22 16:45:21|2023-11-22 20:39:21|       594|    Streamer|     Viewer|
|    102|2023-11-16 13:23:09|2023-11-16 16:10:09|       777|    Streamer|   Streamer|
|    102|2023-11-17 13:23:09|2023-11-17 16:10:09|       778|    Streamer|   Streamer|
|    103|2023-11-27 03:10:49|2023-11-27 03:30:49|       798|    Streamer|   Streamer|
|    104|2023-11-27 03:10:49|2023-11-27 03:30:49|       797|      Viewer|     Viewer|
+-------+-------------------+-------------------+----------+------------+-----------+



In [28]:
sessions=sessions[(sessions['first_value']=='Viewer')&(sessions['session_type']=='Streamer')]
sessions.show()

+-------+-------------------+-------------------+----------+------------+-----------+
|user_id|      session_start|        session_end|session_id|session_type|first_value|
+-------+-------------------+-------------------+----------+------------+-----------+
|    101|2023-11-20 07:16:06|2023-11-20 08:33:06|       315|    Streamer|     Viewer|
|    101|2023-11-22 16:45:21|2023-11-22 20:39:21|       594|    Streamer|     Viewer|
+-------+-------------------+-------------------+----------+------------+-----------+



In [32]:
sessions=sessions.groupBy('user_id')\
    .agg(count('user_id').alias('sessions_count'))\
    .orderBy(col('sessions_count').desc(),col('user_id').asc())

In [33]:
sessions.show()

+-------+--------------+
|user_id|sessions_count|
+-------+--------------+
|    101|             1|
+-------+--------------+

