In [1]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
import pyspark
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.functions import col, min, collect_list, count, desc, udf, lower
import numpy as np

In [3]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
# local datasource
clicks = spark.read.options(delimiter='\t', header='True', inferSchema='True').csv('clickstream.csv')

In [5]:
clicks.show(5)

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1620494781|
|    562|       507|       event|      main|1620494788|
|    562|       507|       event|      main|1620494798|
|    562|       507|       event|      main|1620494801|
|    562|       507|wNaxLlerrorU|      main|1620494808|
+-------+----------+------------+----------+----------+
only showing top 5 rows



In [6]:
clicks.dtypes

[('user_id', 'int'),
 ('session_id', 'int'),
 ('event_type', 'string'),
 ('event_page', 'string'),
 ('timestamp', 'int')]

In [7]:
clicks.count()

5000000

In [8]:
err_mt = clicks.where(
    lower(col("event_type")).like("%error%")).groupby(["user_id", "session_id"]).agg(
    min(clicks.timestamp).alias("err_mt")).withColumnRenamed('user_id','emt_user_id').withColumnRenamed('session_id','emt_session_id')

In [9]:
err_mt.show(5)

+-----------+--------------+----------+
|emt_user_id|emt_session_id|    err_mt|
+-----------+--------------+----------+
|        600|           253|1620509147|
|        800|           104|1620583603|
|        237|           333|1620588494|
|        326|           550|1620592998|
|        491|           714|1620605795|
+-----------+--------------+----------+
only showing top 5 rows



In [10]:
err_mt.count()

73242

In [11]:
paths = clicks.join(err_mt, [clicks.user_id == err_mt.emt_user_id, 
                                           clicks.session_id == err_mt.emt_session_id], 'left_outer').filter(
                                           col('event_type') == 'page').orderBy(clicks.timestamp)

paths = paths.na.fill(np.inf)
paths = paths.where(paths.timestamp <= paths.err_mt)

In [12]:
paths.show(5)

+-------+----------+----------+----------+----------+-----------+--------------+----------+
|user_id|session_id|event_type|event_page| timestamp|emt_user_id|emt_session_id|    err_mt|
+-------+----------+----------+----------+----------+-----------+--------------+----------+
|    562|       507|      page|      main|1620494781|        562|           507|1620494808|
|    466|       849|      page|      main|1620494892| 2147483647|    2147483647|2147483647|
|    466|       849|      page|   digital|1620494915| 2147483647|    2147483647|2147483647|
|    466|       849|      page|      news|1620494923| 2147483647|    2147483647|2147483647|
|    466|       849|      page|   archive|1620494939| 2147483647|    2147483647|2147483647|
+-------+----------+----------+----------+----------+-----------+--------------+----------+
only showing top 5 rows



In [13]:
x = paths.groupby(
    'user_id', 'session_id').agg(
    collect_list('event_page').alias('route'))

In [14]:
y = x.groupBy(
    'route').agg(
    count('user_id').alias('count')).orderBy(
    desc('count')).withColumn('route', udf(lambda t: '-'.join(t))(col('route')))

In [15]:
y.show(30)

+--------------------+-----+
|               route|count|
+--------------------+-----+
|                main|39250|
|        main-tariffs| 6535|
|           main-news| 6278|
|        main-archive| 5850|
|         main-family| 4858|
|        main-digital| 4219|
|          main-bonus| 3494|
|   main-tariffs-news| 1186|
|   main-news-tariffs| 1132|
|main-tariffs-archive| 1037|
|   main-news-archive| 1000|
|main-archive-tariffs|  996|
|   main-archive-news|  995|
| main-family-tariffs|  923|
|    main-news-family|  920|
| main-tariffs-family|  919|
|    main-family-news|  879|
| main-archive-family|  817|
|   main-news-digital|  798|
| main-family-archive|  774|
|   main-tariffs-main|  760|
|main-tariffs-digital|  751|
|   main-digital-news|  748|
|main-digital-tariffs|  724|
|main-archive-digital|  720|
|        main-spravka|  707|
|      main-news-main|  686|
|main-digital-archive|  682|
|  main-tariffs-bonus|  667|
|   main-archive-main|  617|
+--------------------+-----+
only showing t

In [16]:
z = y.toPandas()

In [17]:
z.head(30)

Unnamed: 0,route,count
0,main,39250
1,main-tariffs,6532
2,main-news,6276
3,main-archive,5849
4,main-family,4859
5,main-digital,4220
6,main-bonus,3495
7,main-tariffs-news,1189
8,main-news-tariffs,1130
9,main-tariffs-archive,1037
