# User routes on the site
## Description
**Clickstream** is a sequence of user actions on a website. It allows you to understand how users interact with the site. In this task, you need to find the most frequent custom routes.

## Input data
Input data is а table with clickstream data in file `hdfs:/data/clickstream.csv`.

### Table structure
* `user_id (int)` - Unique user identifier.
* `session_id (int)` - Unique identifier for the user session. The user's session lasts until the identifier changes.
* `event_type (string)` - Event type from the list:
    * **page** - visit to the page
    * **event** - any action on the page
    * <b>&lt;custom&gt;</b> - string with any other type
* `event_type (string)` - Page on the site.
* `timestamp (int)` - Unix-timestamp of action.

### Browser errors
Errors can sometimes occur in the user's browser - after such an error appears, we can no longer trust the data of this session and all the following lines after the error or at the same time with it are considered corrupted and **should not be counted** in statistics.

When an error occurs on the page, a random string containing the word **error** will be written to the `event_type` field.

### Sample of user session
<pre>
+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1620494781|
|    562|       507|       event|      main|1620494788|
|    562|       507|       event|      main|1620494798|
|    562|       507|        page|    family|1620494820|
|    562|       507|       event|    family|1620494828|
|    562|       507|        page|      main|1620494848|
|    562|       507|wNaxLlerrorU|      main|1620494865|
|    562|       507|       event|      main|1620494873|
|    562|       507|        page|      news|1620494875|
|    562|       507|        page|   tariffs|1620494876|
|    562|       507|       event|   tariffs|1620494884|
|    562|       514|        page|      main|1620728918|
|    562|       514|       event|      main|1620729174|
|    562|       514|        page|   archive|1620729674|
|    562|       514|        page|     bonus|1620729797|
|    562|       514|        page|   tariffs|1620731090|
|    562|       514|       event|   tariffs|1620731187|
+-------+----------+------------+----------+----------+
</pre>

#### Correct user routes for a given user:
* **Session 507**: main-family-main
* **Session 514**: main-archive-bonus-tariffs

Route elements are ordered by the time they appear in the clickstream, from earliest to latest.

The route must be accounted for completely before the end of the session or an error in the session.

## Task
You need to use the Spark SQL, Spark RDD and Spark DF interfaces to create a solution file, the lines of which contain **the 30 most frequent user routes** on the site.

Each line of the file should contain the `route` and `count` values **separated by tabs**, where:
* `route` - route on the site, consisting of pages separated by "-".
* `count` - the number of user sessions in which this route was.

The lines must be **ordered in descending order** of the `count` field.

## Criteria
You can get maximum of 3.5 points (final grade) for this assignment, depedning on the number of interface you manage to leverage. The criteria are as follows:

* 0.5 points – Spark SQL solution with 1 query
* 0.5 points – Spark SQL solution with <=2 queries
* 0.5 points – Spark RDD solution
* 0.5 points – Spark DF solution
* 0.5 points – your solution algorithm is relatively optimized, i.e.: no O^2 or O^3 complexities; appropriate object usage; no data leaks etc. This is evaluated by staff.
* 1 point – 1 on 1 screening session. During this session staff member can ask you questions regarding your solution logic, framework usage, questionable parts of your code etc. If your code is clean enough, the staff member can just ask you to solve a theoretical problem connected to Spark.


In [1]:
#! hdfs dfs -df -h

In [2]:
#!cd /

In [3]:
#!ls

In [4]:
#! hdfs dfs -ls 

In [5]:
#!hdfs dfs -ls /data

In [6]:
#!hdfs dfs -rm -R clickstream.csv

In [7]:
#!hadoop fs -mkdir data

In [8]:
#!hadoop fs -mkdir data
#!hadoop fs -copyFromLocal clickstream.csv /data
#! hdfs dfs -ls 

In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)

import pyspark.sql.functions as f

from pyspark.sql.functions import collect_list
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import col

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-10-05 16:41:49,114 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [2]:
clicks = se.read.option('header',True).option("delimiter","\t").csv("/data/clickstream.csv")
clicks.registerTempTable("clicks")

                                                                                

In [11]:
clicks.show(10)


+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
|    562|       507|       event|      main|1695584154|
|    562|       507|       event|      main|1695584154|
|    562|       507|       event|      main|1695584160|
|    562|       507|        page|    rabota|1695584166|
|    562|       507|       event|    rabota|1695584174|
+-------+----------+------------+----------+----------+
only showing top 10 rows



**SQL Part

In [12]:
clear_routes4 = se.sql( \
'''
    SELECT DISTINCT
        event_cumulative as route_def, COUNT(*) as route_amount FROM 
    (SELECT DISTINCT
    unique_id, event_cumulative FROM
   (SELECT DISTINCT
       unique_id, step, event_cumulative,
       MAX(step) OVER(PARTITION BY unique_id ORDER BY step ASC ROWS BETWEEN CURRENT ROW AND 10000 FOLLOWING) AS final_step
       FROM
    (SELECT DISTINCT
        *, ROW_NUMBER() OVER (PARTITION BY unique_id ORDER BY timestamp ASC, err_flag ASC) as step,
        SUM(err_flag) OVER(PARTITION BY unique_id ORDER BY timestamp ASC, err_flag ASC) AS err_flag_cumulative,
        collect_list(event_page) OVER(PARTITION BY unique_id ORDER BY timestamp ASC) AS event_cumulative 
     FROM (
        SELECT DISTINCT
            user_id || '&' || session_id AS unique_id, event_page, INT(timestamp) AS timestamp, instr(event_type, 'error') AS err_flag
        FROM clicks
        WHERE 
            event_type != 'event'
        ORDER BY 
            unique_id, timestamp 
        ) AS flagged_routes
    ORDER BY
        unique_id, timestamp) AS stepped_routes
    WHERE err_flag_cumulative = 0 
    ORDER BY
        unique_id, step) AS clear_routes
    WHERE step = final_step) AS final_routes
    GROUP BY event_cumulative
    ORDER BY route_amount DESC, route_def DESC
    ''')

In [13]:
clear_routes4.show(5)



+----------------+------------+
|       route_def|route_amount|
+----------------+------------+
|          [main]|        8089|
| [main, archive]|        1094|
|  [main, rabota]|        1039|
|[main, internet]|         879|
|   [main, bonus]|         865|
+----------------+------------+
only showing top 5 rows



                                                                                

In [14]:
def SQL_result (SQL_query, top = -1) :
    result = list()
    if top == -1 :
        result_SQL = SQL_query.collect()
    else :
        result_SQL = SQL_query.take(top)
    for row in result_SQL:
        route = row[0]
        route_path = ''
        for step in route:
            route_path += step
            route_path += '-'
        route_path = route_path[:-1]
        result.append((route_path,row[1]))
    return result

In [15]:
f = open("result_SQL.txt","w")
for route in SQL_result(clear_routes4, 30):
    f.write(route[0]+'\t'+str(route[1])+'\n')
f.close()

                                                                                

In [16]:
clear_routes4.unpersist()

DataFrame[route_def: array<string>, route_amount: bigint]

**DF Part

In [17]:
df = clicks
df.cache()
df.show(5)

[Stage 70:>                                                         (0 + 1) / 1]

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
+-------+----------+------------+----------+----------+
only showing top 5 rows



                                                                                

In [18]:
df = df[(df['event_type'] != 'event')]
df = df.withColumn("unique_id",concat_ws('&',df.user_id,df.session_id))
df = df.withColumn("error_flag", df.event_type.like("%error%").cast('Integer'))
df = df.withColumn("error_time", (df.timestamp*df.error_flag).cast('Integer'))
df.show(5)

+-------+----------+------------+----------+----------+---------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|unique_id|error_flag|error_time|
+-------+----------+------------+----------+----------+---------+----------+----------+
|    562|       507|        page|      main|1695584127|  562&507|         0|         0|
|    562|       507|wNaxLlerrorU|      main|1695584154|  562&507|         1|1695584154|
|    562|       507|        page|    rabota|1695584166|  562&507|         0|         0|
|    562|       507|        page|      main|1695584194|  562&507|         0|         0|
|    562|       507|        page|     bonus|1695584221|  562&507|         0|         0|
+-------+----------+------------+----------+----------+---------+----------+----------+
only showing top 5 rows



In [19]:
error_df = df[df.error_time > 0 ]
error_df.cache()
error_df = error_df.drop('event_type','event_page','timestamp','error_flag')
error_df = error_df.groupby("unique_id").min()
error_df = error_df.select(col("unique_id").alias("unique_id_err"), col("min(error_time)").alias("min_err"))
error_df.show(5)



+-------------+----------+
|unique_id_err|   min_err|
+-------------+----------+
|     4848&755|1695672791|
|     2671&515|1695704855|
|     3641&540|1695770418|
|      839&633|1695932747|
|     3655&295|1695969136|
+-------------+----------+
only showing top 5 rows



                                                                                

In [20]:
joined_df = df.join(error_df, df.unique_id == error_df.unique_id_err, 'outer')
joined_df.cache()
error_df.unpersist()
df.unpersist()
joined_df = joined_df[((col("timestamp").cast('Integer') <= col("min_err").cast('Integer')) & (col("error_flag")==0))|(col("min_err").isNull())]
joined_df = joined_df.drop('event_type','error_time','min_err','error_flag','unique_id_err')
joined_df = joined_df.orderBy(['unique_id', 'timestamp'], ascending=True)
joined_df.show(5)



+-------+----------+----------+----------+---------+
|user_id|session_id|event_page| timestamp|unique_id|
+-------+----------+----------+----------+---------+
|      0|       874|      main|1696371064|    0&874|
|      0|       874|    rabota|1696374894|    0&874|
|      0|       874|    online|1696378229|    0&874|
|      0|       879|      main|1696768416|    0&879|
|      0|       879|    online|1696768738|    0&879|
+-------+----------+----------+----------+---------+
only showing top 5 rows



                                                                                

In [21]:

result_df = joined_df.groupBy("unique_id").agg(collect_list("event_page"))
result_df.cache()
joined_df.unpersist()
result_df = result_df.groupBy("collect_list(event_page)").count()
result_df = result_df.orderBy(['count','collect_list(event_page)'], ascending=False)
result_df.show(5)



+------------------------+-----+
|collect_list(event_page)|count|
+------------------------+-----+
|                  [main]| 8090|
|         [main, archive]| 1092|
|          [main, rabota]| 1037|
|        [main, internet]|  874|
|           [main, bonus]|  861|
+------------------------+-----+
only showing top 5 rows



                                                                                

In [22]:
def DF_result (df, top = -1) :
    result = list()
    if top == -1 :
        result_df = df.collect()
    else :
        result_df = df.take(top)
    for row in result_df:
        route = row[0]
        route_path = ''
        for step in route:
            route_path += step
            route_path += '-'
        route_path = route_path[:-1]
        result.append((route_path,row[1]))
    return result

In [23]:
f = open("result_DF.txt","w")
for route in DF_result(result_df, 30):
    f.write(route[0]+'\t'+str(route[1])+'\n')
f.close()

                                                                                

In [24]:
result_df.unpersist()

DataFrame[collect_list(event_page): array<string>, count: bigint]

RDD Part

In [3]:
clicks_rdd = clicks.rdd

In [4]:
clicks_rdd.take(5)

                                                                                

[Row(user_id='562', session_id='507', event_type='page', event_page='main', timestamp='1695584127'),
 Row(user_id='562', session_id='507', event_type='event', event_page='main', timestamp='1695584134'),
 Row(user_id='562', session_id='507', event_type='event', event_page='main', timestamp='1695584144'),
 Row(user_id='562', session_id='507', event_type='event', event_page='main', timestamp='1695584147'),
 Row(user_id='562', session_id='507', event_type='wNaxLlerrorU', event_page='main', timestamp='1695584154')]

In [5]:
def map_flagged(sorted_rdd) :
    unique_id = sorted_rdd[0]+'&'+sorted_rdd[1]
    err_flag = 0
    err_time = 0
    if sorted_rdd[2].find('error') >-1 :
        err_flag = 1
        err_time = int(sorted_rdd[4])
    return unique_id, (sorted_rdd[2],sorted_rdd[3],int(sorted_rdd[4]),err_flag, err_time) 

In [6]:
flagged_rdd = clicks_rdd.filter(lambda x: (x[2]!='event')).map(map_flagged)
flagged_rdd.take(5)

                                                                                

[('562&507', ('page', 'main', 1695584127, 0, 0)),
 ('562&507', ('wNaxLlerrorU', 'main', 1695584154, 1, 1695584154)),
 ('562&507', ('page', 'rabota', 1695584166, 0, 0)),
 ('562&507', ('page', 'main', 1695584194, 0, 0)),
 ('562&507', ('page', 'bonus', 1695584221, 0, 0))]

In [7]:
clicks_rdd.unpersist()
clicks.unpersist()

DataFrame[user_id: string, session_id: string, event_type: string, event_page: string, timestamp: string]

In [8]:
def map_first_error(group) :
    min_timestamp = 0
    for error in group[1] :
        if (min_timestamp == 0) or (error[2] < min_timestamp) :
            min_timestamp = error[2]
    return group[0], min_timestamp    

In [9]:
error_rdd = flagged_rdd.filter(lambda x: x[1][3]==1).map(lambda x: (x[0],x[1][3],x[1][4])).groupBy(lambda x: x[0]).map(map_first_error)
error_rdd.take(5)

                                                                                

[('3493&685', 1696287161),
 ('901&681', 1696293786),
 ('1986&181', 1696307385),
 ('3590&767', 1696311508),
 ('4247&929', 1696327196)]

In [10]:
joined_rdd = flagged_rdd.leftOuterJoin(error_rdd).filter(lambda x: ((x[1][1] is None) or ((x[1][0][2] <= x[1][1])and(x[1][0][3]!=1)))).map(lambda x: (x[0],(x[1][0][1],x[1][0][2])))
joined_rdd.take(5)

                                                                                

[('54&308', ('main', 1695588329)),
 ('54&308', ('archive', 1695588432)),
 ('493&511', ('main', 1695589379)),
 ('493&511', ('news', 1695589390)),
 ('493&511', ('bonus', 1695589472))]

In [11]:
flagged_rdd.unpersist()
error_rdd.unpersist()

PythonRDD[31] at RDD at PythonRDD.scala:53

In [12]:
def sort_key (route) :
    return route[0]

In [13]:
def map_calc_route (session) :
    route = list()
    for step in session[1] :
        route.append((step[1], step[0]))
    route.sort(key=sort_key)
    path = ''
    for step in route :
        path += (step[1] + '-')
    return path[:-1], session[0] 

In [14]:
res_rdd = joined_rdd.groupByKey().map(map_calc_route).groupBy(lambda x: x[0]).map(lambda x: (x[0], len(x[1]))).sortBy(lambda x: x[1], ascending = False)
res_rdd.take(5)

                                                                                

[('main', 8090),
 ('main-archive', 1095),
 ('main-rabota', 1040),
 ('main-internet', 880),
 ('main-bonus', 865)]

In [15]:
joined_rdd.unpersist()

PythonRDD[47] at RDD at PythonRDD.scala:53

In [18]:
def RDD_result (rdd, top = -1) :
    result = list()
    if top == -1 :
        result_rdd = rdd.collect()
    else :
        result_rdd = rdd.take(top)
    for row in result_rdd:
        route = row[0]
        result.append((route,row[1]))
    return result

In [19]:
f = open("result_RDD.txt","w")
for route in RDD_result(res_rdd, 30):
    f.write(route[0]+'\t'+str(route[1])+'\n')
f.close()

In [22]:
res_rdd.unpersist()

PythonRDD[51] at RDD at PythonRDD.scala:53