# User routes on the site
## Description
**Clickstream** is a sequence of user actions on a website. It allows you to understand how users interact with the site. In this task, you need to find the most frequent custom routes.

## Input data
Input data is а table with clickstream data in file `hdfs:/data/clickstream.csv`.

### Table structure
* `user_id (int)` - Unique user identifier.
* `session_id (int)` - Unique identifier for the user session. The user's session lasts until the identifier changes.
* `event_type (string)` - Event type from the list:
    * **page** - visit to the page
    * **event** - any action on the page
    * <b>&lt;custom&gt;</b> - string with any other type
* `event_type (string)` - Page on the site.
* `timestamp (int)` - Unix-timestamp of action.

### Browser errors
Errors can sometimes occur in the user's browser - after such an error appears, we can no longer trust the data of this session and all the following lines after the error or at the same time with it are considered corrupted and **should not be counted** in statistics.

When an error occurs on the page, a random string containing the word **error** will be written to the `event_type` field.

### Sample of user session
<pre>
+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1620494781|
|    562|       507|       event|      main|1620494788|
|    562|       507|       event|      main|1620494798|
|    562|       507|        page|    family|1620494820|
|    562|       507|       event|    family|1620494828|
|    562|       507|        page|      main|1620494848|
|    562|       507|wNaxLlerrorU|      main|1620494865|
|    562|       507|       event|      main|1620494873|
|    562|       507|        page|      news|1620494875|
|    562|       507|        page|   tariffs|1620494876|
|    562|       507|       event|   tariffs|1620494884|
|    562|       514|        page|      main|1620728918|
|    562|       514|       event|      main|1620729174|
|    562|       514|        page|   archive|1620729674|
|    562|       514|        page|     bonus|1620729797|
|    562|       514|        page|   tariffs|1620731090|
|    562|       514|       event|   tariffs|1620731187|
+-------+----------+------------+----------+----------+
</pre>

#### Correct user routes for a given user:
* **Session 507**: main-family-main
* **Session 514**: main-archive-bonus-tariffs

Route elements are ordered by the time they appear in the clickstream, from earliest to latest.

The route must be accounted for completely before the end of the session or an error in the session.

## Task
You need to use the Spark SQL, Spark RDD and Spark DF interfaces to create a solution file, the lines of which contain **the 30 most frequent user routes** on the site.

Each line of the file should contain the `route` and `count` values **separated by tabs**, where:
* `route` - route on the site, consisting of pages separated by "-".
* `count` - the number of user sessions in which this route was.

The lines must be **ordered in descending order** of the `count` field.

## Criteria
You can get maximum of 3.5 points (final grade) for this assignment, depedning on the number of interface you manage to leverage. The criteria are as follows:

* 0.5 points – Spark SQL solution with 1 query
* 0.5 points – Spark SQL solution with <=2 queries
* 0.5 points – Spark RDD solution
* 0.5 points – Spark DF solution
* 0.5 points – your solution algorithm is relatively optimized, i.e.: no O^2 or O^3 complexities; appropriate object usage; no data leaks etc. This is evaluated by staff.
* 1 point – 1 on 1 screening session. During this session staff member can ask you questions regarding your solution logic, framework usage, questionable parts of your code etc. If your code is clean enough, the staff member can just ask you to solve a theoretical problem connected to Spark.


In [1]:
# sc.stop()

In [2]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

sc = pyspark.SparkContext(appName='jupyter')
se = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-10-08 18:02:12,323 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [3]:
# !hadoop fs -put clickstream.csv clickstream.csv

In [4]:
# reading csv
c_df = se.read.csv("/data/clickstream.csv", sep='\t', header=True, inferSchema=True)
c_df.show(10)

                                                                                

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
|    562|       507|       event|      main|1695584154|
|    562|       507|       event|      main|1695584154|
|    562|       507|       event|      main|1695584160|
|    562|       507|        page|    rabota|1695584166|
|    562|       507|       event|    rabota|1695584174|
+-------+----------+------------+----------+----------+
only showing top 10 rows



In [5]:
# !hadoop fs -ls

In [6]:
# creating temp table "stream" in spark sql catalog
c_df.orderBy("user_id", "session_id", "timestamp")
c_df.createOrReplaceTempView("stream_init")

In [7]:
# checking distinct values for event_type just to understand if they can be excluded easily
c_df.select('event_type').distinct().count()
# nope, we have many different errors

                                                                                

20448

## 1. Spark SQL solution

### As I see in the tesk example - we count only event_type 'page' in our route. I will treat all routes for event_type='page' until error or end of session, as no additional restrictions are definied in the task itself.

In [8]:
# se.sql('SHOW COLUMNS FROM stream').show()

In [9]:
c_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_page: string (nullable = true)
 |-- timestamp: integer (nullable = true)



First I'd tried approach not similar to the logic we used for DF solution, but something is missing as I getting slightly lower numbers - had no time to find the reason - not very convenient debug SQL wih HDFS :)

In [10]:
# sql_rts_count = se.sql("""
# WITH corr_sessions_err AS 
#     (SELECT * FROM stream_df WHERE event_type = 'page' AND stream_df.timestamp < (SELECT MIN(slave.timestamp) 
#     FROM stream_df AS slave WHERE slave.user_id = stream_df.user_id AND slave.event_type LIKE '%error%')),        
# corr_sessions_no_err AS 
#     (SELECT * FROM stream_df s WHERE s.event_type = 'page' AND NOT EXISTS
#         (SELECT 1 FROM corr_sessions_err c WHERE (s.user_id = c.user_id AND s.session_id = c.session_id))
#     ),
# res_tbl AS 
#     ((SELECT * FROM corr_sessions_err UNION ALL SELECT * FROM corr_sessions_no_err)
#     ORDER BY user_id, session_id, timestamp ASC),
# res_routes AS 
#     (SELECT user_id, session_id, COLLECT_LIST(event_page) AS rt FROM res_tbl GROUP BY user_id, session_id)
#         (SELECT CONCAT_WS('-', rt) as route, COUNT(*) as count
#         FROM res_routes
#         GROUP BY route
#         ORDER BY count DESC
#         LIMIT 30)
# """)

Second approach I did after DF was done, so logic is ismilar.

In [11]:
sql_rts_count = se.sql("""
WITH corr_sessions_err AS 
    (SELECT user_id, session_id, MIN(timestamp) AS err_t FROM stream_init WHERE event_type LIKE '%error%'
    GROUP BY user_id, session_id),
res_tbl AS 
    (SELECT s.user_id, s.session_id, s.event_page, s.timestamp FROM stream_init s
    LEFT JOIN corr_sessions_err c ON s.user_id = c.user_id AND s.session_id = c.session_id
    WHERE s.event_type = 'page' AND (s.timestamp < c.err_t OR c.err_t IS NULL)),
res_routes AS 
    (SELECT user_id, session_id, COLLECT_LIST(event_page) AS rt FROM res_tbl GROUP BY user_id, session_id)
        (SELECT CONCAT_WS('-', rt) AS route, COUNT(*) AS count
        FROM res_routes
        GROUP BY route
        ORDER BY count DESC
        LIMIT 30)
""")

In [12]:
sql_rts_count.show(5)

                                                                                

+-------------+-----+
|        route|count|
+-------------+-----+
|         main| 8090|
| main-archive| 1091|
|  main-rabota| 1035|
|main-internet|  880|
|   main-bonus|  864|
+-------------+-----+
only showing top 5 rows



In [13]:
# uploading data for 30 entries
sql_rts_count.write.csv("hdfs:/data/lsml_sga_sql_zhukov.csv", sep='\t')

                                                                                

## 2. SQL with 2 queries

It can be easily done by separating 1-query solution into 2, so sorry do not see any purpose to use the same query here separated in two :) It will not help with complexity or speed as well.

## 3. Spark RDD solution

In [14]:
# define RDD to work with from our external file
stream = sc.textFile("/data/clickstream.csv").cache()

In [15]:
# setting column names as 1st line
column_names = stream.first()
column_names

'user_id\tsession_id\tevent_type\tevent_page\ttimestamp'

In [16]:
# filtering stream only for stream data + split by \t
stream = stream.filter(lambda line: line != column_names).map(lambda line:line.split('\t'))

In [17]:
# checking values of first strings
stream.take(5)

[['562', '507', 'page', 'main', '1695584127'],
 ['562', '507', 'event', 'main', '1695584134'],
 ['562', '507', 'event', 'main', '1695584144'],
 ['562', '507', 'event', 'main', '1695584147'],
 ['562', '507', 'wNaxLlerrorU', 'main', '1695584154']]

In [18]:
# creating keys from user id and session id and aggregate by them sorted by timestamp
step_1 = stream.map(lambda row: ((row[0], row[1]), [row[2], row[3], row[4]])).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1][2])

                                                                                

In [19]:
def err(elem):
    lst = []
    for i in elem:
        if 'error' not in i:
            lst.append(i)
        else:
            return lst
    return lst

In [20]:
# returning only stream until error - user id and and session id 
# not needed anymore as we have reduced by them as keys previously
step_2 = step_1.map(lambda x: err(x[1]))

In [21]:
step_2.take(1)

                                                                                

[['page',
  'main',
  '1695584127',
  'event',
  'main',
  '1695584134',
  'event',
  'main',
  '1695584144',
  'event',
  'main',
  '1695584147']]

In [22]:
def del_elem(lst, place_n):
    del lst[place_n-1::place_n]
    return lst

In [23]:
# deleting all timestamps as we have arranged lists
step_3 = step_2.map(lambda x: del_elem(x, 3))

In [24]:
step_3.take(1)

[['page', 'main', 'event', 'main', 'event', 'main', 'event', 'main']]

In [25]:
def only_pages(lst):
    empty = []
    for i in range(0, len(lst),2):
        if lst[i] == 'page':
            empty.append(lst[i+1])
    return empty

In [26]:
only_pages(['page', 'main', 'event', 'main', 'event', 'main', 'event', 'main'])

['main']

In [27]:
# selecting only if event in stream is "page" type
step_4 = step_3.map(lambda x: only_pages(x)).map(lambda x: ("-".join(x), 1))

In [28]:
step_4.take(10)

[('main', 1),
 ('main-online-bonus-news-main-news-vklad-rabota-bonus-tariffs', 1),
 ('main-internet', 1),
 ('main', 1),
 ('main', 1),
 ('main-tariffs-main-online-news-main-internet-online-news-main-news', 1),
 ('main-archive-news-tariffs-main-rabota-main-online', 1),
 ('main', 1),
 ('main-bonus-main', 1),
 ('main-archive-internet-archive-main-tariffs-main-vklad', 1)]

In [29]:
count = step_4.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False)

                                                                                

In [30]:
# checking
count.take(5)

                                                                                

[('main', 8091),
 ('main-archive', 1096),
 ('main-rabota', 1039),
 ('main-internet', 880),
 ('main-bonus', 865)]

In [31]:
# uploading data for 30 entries
count_rdd_df = count.toDF().orderBy(F.desc("_2")).limit(30).write.csv("hdfs:/data/lsml_sga_rdd_zhukov.csv", sep='\t')

                                                                                

## 4. Spark DF Solution

In [32]:
# csv to Spark DF
df = se.read.csv("hdfs:/data/clickstream.csv", header=True, sep="\t", inferSchema=True)

In [33]:
# df = df.orderBy(['user_id', 'session_id', 'timestamp'], ascending=True).show()

In [34]:
# create window func over user id and session id
window = Window.partitionBy(*["user_id", "session_id"]).orderBy("timestamp")

In [35]:
err_tst = (df.filter(F.col("event_type").like("%error%")).groupBy("user_id", "session_id").agg(F.min("timestamp").alias("err_ts")))

In [36]:
# checking
err_tst.orderBy(['user_id', 'session_id', 'err_ts'], ascending=True).show(5)

+-------+----------+----------+
|user_id|session_id|    err_ts|
+-------+----------+----------+
|      0|       879|1696777359|
|      0|       898|1697629831|
|      0|       901|1699003239|
|      0|       912|1700596574|
|      0|       922|1700724457|
+-------+----------+----------+
only showing top 5 rows



In [37]:
# joining 2 tables on user id and session id + sorted
joined_tbls = (df.join(err_tst,on=["user_id", "session_id"],how="left_outer")).orderBy(["user_id", "session_id", "timestamp"])

In [38]:
# checking
# joined_tbls.show(100)
# checks ok

In [39]:
# filtering to get event_type only 'page' and remove all aftere first error in a session 
fltrd_joined_tbls = joined_tbls.filter((F.col("event_type") == "page") & ((F.col("timestamp") < F.col("err_ts")) | F.col("err_ts").isNull()))

In [40]:
fltrd_joined_tbls.show(20)



+-------+----------+----------+----------+----------+----------+
|user_id|session_id|event_type|event_page| timestamp|    err_ts|
+-------+----------+----------+----------+----------+----------+
|      0|       874|      page|      main|1696371064|      null|
|      0|       874|      page|    rabota|1696374894|      null|
|      0|       874|      page|    online|1696378229|      null|
|      0|       879|      page|      main|1696768416|1696777359|
|      0|       879|      page|    online|1696768738|1696777359|
|      0|       879|      page|   tariffs|1696768973|1696777359|
|      0|       879|      page|    online|1696769277|1696777359|
|      0|       879|      page|      main|1696773185|1696777359|
|      0|       879|      page|  internet|1696774086|1696777359|
|      0|       879|      page|    online|1696776502|1696777359|
|      0|       885|      page|      main|1697348270|      null|
|      0|       888|      page|      main|1697564550|      null|
|      0|       898|     

                                                                                

In [44]:
# aggregating max length routes into separate column over defined window (by user and session)
rts = (fltrd_joined_tbls.withColumn("rts", F.collect_list("event_page").over(window)).groupBy("user_id", "session_id")).agg(F.max("rts").alias("route"))

In [45]:
rts.show(3)

                                                                                

+-------+----------+--------------------+
|user_id|session_id|               route|
+-------+----------+--------------------+
|      0|       874|[main, rabota, on...|
|      0|       898|[main, news, tari...|
|      0|       901|[main, internet, ...|
+-------+----------+--------------------+
only showing top 3 rows



In [47]:
#concat routes to correct format and count routes ordered desc
rts_fin = rts.withColumn("route", F.expr("concat_ws('-', route)")).groupBy("route").count().orderBy(F.desc("count")).limit(30)

In [48]:
rts_fin.show(5)
# looks fine

                                                                                

+-------------+-----+
|        route|count|
+-------------+-----+
|         main| 8090|
| main-archive| 1096|
|  main-rabota| 1039|
|main-internet|  880|
|   main-bonus|  865|
+-------------+-----+
only showing top 5 rows



In [49]:
# uploading data for 30 entries
rts_fin.write.csv("hdfs:/data/lsml_sga_df_zhukov.csv", sep='\t')

                                                                                

In [50]:
# # reading files to check format
# se.read.csv("hdfs:/data/lsml_sga_sql_zhukov.csv", header=False, sep="\t").show()

#### What about complexity - at a first glance we are using only O(n) steps in all 3 solutions.