### Import lib's

In [1]:
import time
import pyspark.sql.functions as f

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *

from pyspark.sql.functions import udf
from pyspark.sql.functions import array_contains
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import desc

from itertools import islice



### Read file from HDFS

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark Assignment") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext
rdd = sc.textFile('/data/lsml/sga/clickstream.csv')

### Remove header from file and parse it

In [3]:
rdd = rdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
)
rdd = rdd.map(lambda l: l.split('\t'))
rdd.take(5)

[['562', '507', 'page', 'main', '1620494781'],
 ['562', '507', 'event', 'main', '1620494788'],
 ['562', '507', 'event', 'main', '1620494798'],
 ['562', '507', 'event', 'main', '1620494801'],
 ['562', '507', 'wNaxLlerrorU', 'main', '1620494808']]

### Do schema for data

where session_id = session_id + user_id as string

In [4]:
rdd = rdd.map(lambda p: Row(session_id=int(str(p[1]) + str(p[0])), event_type=str(p[2]), event_page=str(p[3]), timestamp=int(p[4])))
rdd.take(5)

[Row(event_page='main', event_type='page', session_id=507562, timestamp=1620494781),
 Row(event_page='main', event_type='event', session_id=507562, timestamp=1620494788),
 Row(event_page='main', event_type='event', session_id=507562, timestamp=1620494798),
 Row(event_page='main', event_type='event', session_id=507562, timestamp=1620494801),
 Row(event_page='main', event_type='wNaxLlerrorU', session_id=507562, timestamp=1620494808)]

### Create DataFrame

In [5]:
df = spark.createDataFrame(rdd)
df.createOrReplaceTempView('clickstream')
df.cache()
df.show(5)
df.count()

+----------+------------+----------+----------+
|event_page|  event_type|session_id| timestamp|
+----------+------------+----------+----------+
|      main|        page|    507562|1620494781|
|      main|       event|    507562|1620494788|
|      main|       event|    507562|1620494798|
|      main|       event|    507562|1620494801|
|      main|wNaxLlerrorU|    507562|1620494808|
+----------+------------+----------+----------+
only showing top 5 rows



5000000

### Remove Events

In [6]:
df = df[df['event_type'] != 'event']
df.show(5)
df.count()

+----------+------------+----------+----------+
|event_page|  event_type|session_id| timestamp|
+----------+------------+----------+----------+
|      main|        page|    507562|1620494781|
|      main|wNaxLlerrorU|    507562|1620494808|
|    family|        page|    507562|1620494820|
|      main|        page|    507562|1620494848|
|      news|        page|    507562|1620494875|
+----------+------------+----------+----------+
only showing top 5 rows



2107171

In [7]:
df = df.orderBy(['session_id', 'timestamp'], ascending=True)
df.show(15)
df.count()

+----------+-----------------+----------+----------+
|event_page|       event_type|session_id| timestamp|
+----------+-----------------+----------+----------+
|      main|             page|       453|1620984970|
|   tariffs|             page|       453|1620985116|
|     bonus|             page|       453|1620985410|
|      news|             page|       453|1620985742|
|   digital|             page|       453|1620986995|
|      main|             page|       453|1620987545|
|   digital|             page|       453|1620987637|
|   archive|             page|       453|1620987947|
|     bonus|             page|       453|1620988477|
|   tariffs|             page|       453|1620988844|
|   archive|             page|       453|1620989032|
|   tariffs|             page|       453|1620989166|
|   tariffs|         XWerrorA|       453|1620991135|
|   tariffs|CWUerrormnOlyfxwI|       453|1620991953|
|      news|             page|       453|1620992130|
+----------+-----------------+----------+-----

2107171

In [8]:
df.groupby('session_id').count().count()

239202

In [9]:
df.groupby('session_id').count().orderBy('count', ascending=False).show(10)

+----------+-----+
|session_id|count|
+----------+-----+
|   1367635|  105|
|    860345|  100|
|   1538780|   96|
|    856537|   95|
|    917820|   92|
|   2267958|   92|
|   1530606|   92|
|   2016667|   92|
|    235677|   90|
|   1297637|   88|
+----------+-----+
only showing top 10 rows



In [10]:
df[df['session_id'] == 1367635].count()

105

### Calculate result

In [11]:
dataCollect = df.collect()
len(dataCollect)

Traceback (most recent call last):
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/u0/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/u0/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/u0/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
  

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 49323)
----------------------------------------


  File "/u0/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/pyspark/serializers.py", line 685, in read_int
    raise EOFError
EOFError


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:56813)

In [None]:
result = {}

In [None]:
for row in dataCollect:
    session_id = row['session_id']
    event_type = row['event_type']
    event_page = row['event_page']
    
    if session_id not in result:
        result[session_id] = []
        
    if event_type == 'page':
        result[session_id].append(event_page)
    else:
        result[session_id].append('error')

In [None]:
len(result)

In [None]:
rows = []

In [None]:
for k, v in result.items():
    # create route
    route = []
    for page in v:
        if page == 'error': break
        if len(route) > 0:
            if route[-1] == page:
                continue
        route.append(page)
        
    route = '-'.join(route)
        
    # Add session_id, route to DataFrame
    rows.append(Row(k, route))

In [None]:
len(rows)

In [None]:
df2 = spark.createDataFrame(rows, ['session_id', 'route'])
df2.show(5, False)

In [None]:
df2[df2['session_id'] == 507].show()

### Check

In [None]:
df[df['session_id'] == 514].show(50, False)

In [None]:
df3 = df2.groupby('route').count().orderBy('count', ascending=False).limit(30)

In [None]:
df3.show(100, truncate=False)

In [None]:
for row in df3.collect():
    print(row)
    break

In [None]:
print(df3.limit(30).toPandas().to_string(index=False, header=False))

In [None]:
!hadoop fs -ls

In [None]:
!hadoop fs -get sga_output.csv /home/kzavarov/