# LSML1 SGA

> Large Scale Machine Learning 1 (Spring23)

> Sergey Terskov

# Solution 2. Spark RDD

In [7]:
%%time

#Initialize Spark
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType

sc = SparkContext(appName="LSML1_SGA")
se = SparkSession(sc)

# Read data from HDFS
events_df = se.read.format("csv") \
      .options(delimiter="\t", header=True) \
      .schema("user_id bigint, session_id bigint, event_type string, event_page string, timestamp bigint") \
      .load("hdfs:///data//clickstream.csv")

print("Initial dataset example:")
events_df.show(5)

events_rdd = events_df.rdd

# Group users by sessions
def process_events(events):
    user_sessions = {}
    
    for user_id, session_id, event_type, event_page, _ in events:
        
        if (user_id, session_id) not in user_sessions:
            user_sessions[(user_id, session_id)] = {'pages': [], 'has_error': False}
        
        if not user_sessions[(user_id, session_id)]['has_error'] and event_type == "page":
            user_sessions[(user_id, session_id)]['pages'].append(event_page)

    return user_sessions

# Get users' sessions
user_sessions = events_rdd.groupBy(lambda x: (x[0], x[1])) \
    .flatMap(lambda x: process_events(x[1]).items())

# Count of routes
def count_routes(user_sessions):
    route_counts = {}
    
    for (user_id, session_id), session_info in user_sessions:

        route_str = '-'.join(session_info['pages'])
        route_counts[route_str] = route_counts.get(route_str, 0) + 1
    
    return route_counts.items()

route_counts_rdd = user_sessions.flatMap(lambda x: count_routes([x]))

# Get routes
top_routes_df = se.createDataFrame(
    data=route_counts_rdd.reduceByKey(lambda a, b: a + b).takeOrdered(30, key=lambda x: -x[1]),
    schema = ["route","count"]
)

# Show result
print("Top 30 frequent routes:")
top_routes_df.show(30)

# Save to csv
top_routes_df.toPandas().to_csv("solution_2_RDD.csv", sep="\t")

#Stop Spark session
se.stop()

2024-10-30 20:03:58,456 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


Initial dataset example:


                                                                                

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
+-------+----------+------------+----------+----------+
only showing top 5 rows



                                                                                

Top 30 frequent routes:


                                                                                

+--------------------+-----+
|               route|count|
+--------------------+-----+
|                main| 5984|
|        main-archive|  837|
|         main-rabota|  802|
|       main-internet|  686|
|          main-bonus|  656|
|           main-news|  594|
|        main-tariffs|  511|
|         main-online|  445|
|          main-vklad|  408|
| main-archive-rabota|  136|
| main-rabota-archive|  134|
|   main-rabota-bonus|  111|
|  main-bonus-archive|  109|
|    main-news-rabota|  108|
|   main-bonus-rabota|  105|
|main-internet-arc...|  104|
|main-internet-rabota|  104|
|   main-archive-news|  103|
|    main-rabota-news|  103|
|  main-archive-bonus|  100|
|main-archive-inte...|  100|
|   main-news-archive|   94|
|main-rabota-internet|   92|
|main-tariffs-inte...|   89|
| main-internet-bonus|   85|
|  main-internet-news|   81|
|     main-news-bonus|   80|
|  main-news-internet|   78|
|main-archive-tariffs|   76|
| main-archive-online|   76|
+--------------------+-----+



                                                                                

CPU times: user 963 ms, sys: 392 ms, total: 1.36 s
Wall time: 1min 10s
