## Evaluation Assignment

Data: outbrain click prediction

Tasks:
Using Spark RDD, DataFrame API and Python, calculate:

**1**. Top 10 most visited document_ids in the page_views_sample log

**2**. How many users have at least 2 different traffic_sources in the page_views_sample log (note the value is not a count, it's an encoded enum)

**3***. Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)

The submission format is the result.json json file with top_10_documents, users and top_10_topics keys.
For TOP-10 results, the answer must be written in the form of a sheet ordered from TOP-1 to TOP-10 with an id.

result.json example:

    {
        "top_10_documents": [
            111,
            222,
            333,
            ...,
            1010
        ],
        "users": 10000,
        "top_10_topics": [
            11,
            22,
            33,
            ...,
            101
        ]
    }

In [1]:
from pyspark import __version__ as pyspark_version
from py4j import __version__ as py4j_version

print("PySpark version:", pyspark_version)
print("Py4J version:", py4j_version)

PySpark version: 3.2.3
Py4J version: 0.10.9.5


In [2]:
import os
import json
import findspark
import tqdm.notebook as tqdm
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, countDistinct
from IPython.display import display

In [3]:
# start Hadoop
! /home/jovyan/start-hadoop.sh

jovyan
 * Starting OpenBSD Secure Shell server sshd
start-stop-daemon: unable to set gid to 0 (Operation not permitted)
   ...fail!
 * sshd is running
Starting namenodes on [localhost]
localhost: namenode is running as process 188.  Stop it first and ensure /tmp/hadoop-jovyan-namenode.pid file is empty before retry.
Starting datanodes
localhost: datanode is running as process 293.  Stop it first and ensure /tmp/hadoop-jovyan-datanode.pid file is empty before retry.
Starting secondary namenodes [984defca1bf6]
984defca1bf6: secondarynamenode is running as process 506.  Stop it first and ensure /tmp/hadoop-jovyan-secondarynamenode.pid file is empty before retry.
Starting resourcemanager
resourcemanager is running as process 713.  Stop it first and ensure /tmp/hadoop-jovyan-resourcemanager.pid file is empty before retry.
Starting nodemanagers
localhost: nodemanager is running as process 811.  Stop it first and ensure /tmp/hadoop-jovyan-nodemanager.pid file is empty before retry.
18897 sun.

In [4]:
# Initialize Spark
findspark.init()
sc = SparkSession.builder.appName('jupyter').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-03-23 18:48:17,798 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [5]:
se = SparkSession(sc)

In [6]:
# check HDFS status
!hdfs dfs -df -h
!hdfs dfs -ls /

Filesystem                Size   Used  Available  Use%
hdfs://localhost:9000  929.0 G  1.1 G    668.5 G    0%
Found 2 items
drwxrwx---   - root   supergroup          0 2023-03-23 16:54 /tmp
drwxr-xr-x   - jovyan supergroup          0 2023-03-23 16:54 /user


To use the Kaggle API, you'll need to download your kaggle.json file.
Follow these steps:
1. Go to the Kaggle website (https://www.kaggle.com/).
2. Log in to your account.
3. Click on your profile picture in the top right corner and then click on 'Account.'
4. Scroll down to the 'API' section, and click the 'Create New API Token' button.
5. Your kaggle.json file will be downloaded to your computer.
6. Move the kaggle.json file to the current working directory.

In [7]:
# Get the current working directory
current_directory = os.getcwd()
print(f"Current directory: {current_directory}")

Current directory: /home/jovyan/work


In [8]:
# Check if the kaggle.json file in the current working directory
print("\nFiles and directories:")
for file in os.listdir(current_directory):
    print(file)


Files and directories:
wiki
images
yandex_music
spark-advanced-seminar-task.ipynb
spark-ml-seminar.ipynb
spark-ml-task.ipynb
.ipynb_checkpoints
kaggle.json
.git
lsml
SGA_taschaste.ipynb


In [9]:
!mkdir -p ~/.kaggle
!cp /home/jovyan/work/kaggle.json ~/.kaggle/kaggle.json
!chmod 600 ~/.kaggle/kaggle.json

In [10]:
!pip install kaggle



In [11]:
# Get files to HDFS
files = [
         "page_views_sample", 
         "documents_topics"
]

for file in files:
    !kaggle competitions download -c outbrain-click-prediction -f {file}.csv.zip
    !unzip {file}.csv.zip
    !hdfs dfs -put {file}.csv
    !rm {file}.csv.zip {file}.csv

print('\nDone!')

Downloading page_views_sample.csv.zip to /home/jovyan/work
100%|███████████████████████████████████████▊| 148M/149M [00:23<00:00, 6.73MB/s]
100%|████████████████████████████████████████| 149M/149M [00:23<00:00, 6.53MB/s]
Archive:  page_views_sample.csv.zip
  inflating: page_views_sample.csv   
put: `page_views_sample.csv': File exists
Downloading documents_topics.csv.zip to /home/jovyan/work
 99%|███████████████████████████████████████▋| 120M/121M [00:20<00:00, 5.80MB/s]
100%|████████████████████████████████████████| 121M/121M [00:20<00:00, 6.16MB/s]
Archive:  documents_topics.csv.zip
  inflating: documents_topics.csv    
put: `documents_topics.csv': File exists

Done!


In [12]:
# Let's see the data
for name in tqdm.tqdm(files):
    df = se.read.csv("{}.csv".format(name), header=True)
    df.registerTempTable(name)
    print(name)
    df.limit(3).show()

  0%|          | 0/2 [00:00<?, ?it/s]

                                                                                

page_views_sample


                                                                                

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
+--------------+-----------+---------+--------+------------+--------------+

documents_topics
+-----------+--------+------------------+
|document_id|topic_id|  confidence_level|
+-----------+--------+------------------+
|    1595802|     140|0.0731131601068925|
|    1595802|      16|0.0594164867373976|
|    1595802|     143|0.0454207537554526|
+-----------+--------+------------------+



In [13]:
# Read the CSV files into PySpark DataFrames
page_views = sc.read.csv("page_views_sample.csv", header=True, inferSchema=True)
documents_topics = sc.read.csv("documents_topics.csv", header=True, inferSchema=True)

                                                                                

In [14]:
# Task 1: Top 10 most visited document_ids in the page_views_sample log
top_10_documents = (
    page_views
    .groupBy("document_id")
    .count()
    .orderBy(desc("count"))
    .limit(10)
    .select("document_id")
    .rdd.map(lambda x: int(x.document_id))
    .collect()
)

                                                                                

In [15]:
# Task 2: How many users have at least 2 different traffic_sources in the page_views_sample log
users = (
    page_views
    .groupBy("uuid")
    .agg(countDistinct("traffic_source").alias("distinct_traffic_sources"))
    .where("distinct_traffic_sources >= 2")
    .count()
)

                                                                                

In [16]:
# Task 3: Top 10 most visited topic_ids in page_views_sample log (use documents_topics table)
top_10_topics = (
    page_views
    .join(documents_topics, page_views.document_id == documents_topics.document_id, 'inner')
    .groupBy(documents_topics.topic_id)
    .count()
    .orderBy(desc("count"))
    .limit(10)
    .select("topic_id")
    .rdd.map(lambda x: int(x.topic_id))
    .collect()
)

                                                                                

In [17]:
# Let's see the results
result = {
    "top_10_documents": top_10_documents,
    "users": users,
    "top_10_topics": top_10_topics
}

print(result)

{'top_10_documents': [1811567, 234, 42744, 1858440, 1780813, 60164, 1790442, 1877626, 1821895, 732651], 'users': 98080, 'top_10_topics': [20, 16, 216, 136, 140, 143, 36, 97, 8, 269]}


In [18]:
# Write to the JSON
with open("result.json", "w") as f:
    json.dump(result, f)

In [19]:
# Send to the test
! curl -F file=@result.json "51.250.54.133:80/MDS-LSML1/tascha_ste/w4/1"

1.0
Well done!


In [20]:
sc.stop()