# Exercises on Spark Core API

This notebook contains exercises on three different datasets. The goal is to solve these exercises using the **Spark Core API**.

We start by installing pyspark (only execute if this is needed, e.g., if you are running this on Google Colab), and downloading the datasets. The exercises follow.

### Useful documentation to do these exercises.

The PySpark Documentation is available at https://spark.apache.org/docs/latest/api/python/index.html.

Instructions on how to install PySpark on your local PC may be found at https://spark.apache.org/docs/latest/api/python/getting_started/install.html. Note that by installing PySpark in this way, you automatically have a local copy of Spark.

The Spark Core  api that we use below has the following  documentation which is a useful reference to have: https://spark.apache.org/docs/latest/api/python/reference/pyspark.html

#### Installing PySpark

In [6]:
# This installs pyspark in the current python environment.
# By installing pyspark, we automatically also install spark.
# You **need** to run this cell when running this notebook in google colab
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=271ce522a90fd705106fe58a19557fdf4ff103e9f7497f1b18ed75bc1e8c8d67
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


#### General imports and starting Spark

In [7]:
#This is needed to start a Spark session from the notebook
#You may adjust the memory used by the driver program based on your machine's settings
import os
os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=3g  pyspark-shell"

from pyspark.sql import SparkSession

In [8]:
# -------------------------------
# Start Spark in LOCAL mode
# -------------------------------

#The following lines are just there to allow this cell to be re-executed multiple times:
#if a spark session was already started, we stop it before starting a new one
#(there can be only one spark context per jupyter notebook)
try:
    spark
    print("Spark application already started. Terminating existing application and starting new one")
    spark.stop()
except:
    pass

# Create a new spark session (note, the * indicates to use all available CPU cores)
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("demoRDD") \
    .getOrCreate()

#When dealing with RDDs, we work the sparkContext object. See https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext
sc=spark.sparkContext

# We print the sparkcontext. This prints general information about the spark instance we have connected to.
# In particular, the hyperlink allows us to open the spark UI (useful for seeing what is going on)
# Note: this hyperlink won't work when running this notebook in Google Colab.
sc

### Downloading data

The next cell downloads the data required to do the exercises

In [9]:
!mkdir downloads
!wget 'https://drive.google.com/u/0/uc?id=1Xlmwiku1RKLyACAPMFZMg1-RsWY-8tYS&export=download' -O downloads/data-spark-exercises.zip
!unzip -q downloads/data-spark-exercises.zip
!ls data

--2024-03-13 20:08:21--  https://drive.google.com/u/0/uc?id=1Xlmwiku1RKLyACAPMFZMg1-RsWY-8tYS&export=download
Resolving drive.google.com (drive.google.com)... 142.251.111.101, 142.251.111.139, 142.251.111.113, ...
Connecting to drive.google.com (drive.google.com)|142.251.111.101|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://drive.google.com/uc?id=1Xlmwiku1RKLyACAPMFZMg1-RsWY-8tYS&export=download [following]
--2024-03-13 20:08:21--  https://drive.google.com/uc?id=1Xlmwiku1RKLyACAPMFZMg1-RsWY-8tYS&export=download
Reusing existing connection to drive.google.com:443.
HTTP request sent, awaiting response... 303 See Other
Location: https://drive.usercontent.google.com/download?id=1Xlmwiku1RKLyACAPMFZMg1-RsWY-8tYS&export=download [following]
--2024-03-13 20:08:21--  https://drive.usercontent.google.com/download?id=1Xlmwiku1RKLyACAPMFZMg1-RsWY-8tYS&export=download
Resolving drive.usercontent.google.com (drive.usercontent.google.com)... 172.253.63.132, 2

## 1. Sensor data exercises
In the file “data/sensors/sensor-sample.txt” you will find on each line, multiple fields of information, let’s call them : Date(Date), Time(Time), RoomId(Integer)-SensorId(Integer), Value1(float), Value2(float)
Using this file, use spark to compute the following queries :

1. Count the number of entries for each day.
2. Count the number of measures for each pair of RoomId-SensorId.
3. Compute the average of Value1.

1. Count the number of entries for each day.

In [10]:
from datetime import datetime

sensorsRDD = sc.textFile('data/sensors/sensor-sample.txt')

# convert string to date
count_by_day = sensorsRDD.map(lambda line: (datetime.strptime(line.split()[0], '%Y-%m-%d').date(), 1)).countByKey()

for date, count in list(count_by_day.items())[:5]:
    print(f"{date}: {count} entries")

2017-03-31: 3393 entries
2017-02-28: 62103 entries
2017-03-01: 33423 entries
2017-03-02: 32403 entries
2017-03-03: 29727 entries


2. Count the number of measures for each pair of RoomId-SensorId.

In [11]:
# Count occurrences of each room-sensor combination
count_by_room_sensor = sensorsRDD.map(lambda line: (line.split()[2], 1)).countByKey()

# Iterate through room-sensor combinations and print counts
for room_sensor, count in count_by_room_sensor.items():
    print(f"{room_sensor}: {count} measures")

1-0: 43047 measures
1-1: 43047 measures
1-2: 43047 measures
2-0: 46915 measures
2-1: 46915 measures
2-2: 46915 measures
3-0: 46634 measures
3-1: 46634 measures
3-2: 46634 measures
4-0: 43793 measures
4-1: 43793 measures
4-2: 43793 measures
5-0: 35 measures
5-1: 35 measures
5-2: 35 measures
6-0: 35666 measures
6-1: 35666 measures
6-2: 35666 measures
7-0: 14910 measures
7-1: 14910 measures
7-2: 14910 measures


3. Compute the average of Value1.

In [12]:
# Extract Value1 and convert it to a float
value1_rdd = sensorsRDD.map(lambda line: float(line.split()[3]))

# Calculate the average
total_sum = value1_rdd.reduce(lambda x, y: x + y)
total_count = value1_rdd.count()
average_value1 = total_sum / total_count if total_count > 0 else 0

print(f"Average of Value1: {average_value1}")

Average of Value1: 92.80699275770311


## 2. Movielens movie data exercises

Movielens (https://movielens.org/) is a website that provides non-commercial, personalised movie recommendations. GroupLens Research has collected and made available rating data sets from the MovieLens web site for the purpose of research into making recommendation services. In this exercise, we will use one of these datasets (the movielens latest dataset, http://files.grouplens.org/datasets/movielens/ml-latest-small.zip) and compute some basic queries on it.
The dataset has already been downloaded and is available at data/movielens/movies.csv, data/movielens/ratings.csv, data/movielens/tags.csv, data/movielens/links.csv

1. Inspect the dataset's [README file](http://files.grouplens.org/datasets/movielens/ml-latest-small-README.html), in particular the section titled "Content and Use of Files" to learn the structure of these three files.
2. Compute all pairs (`movieid`, `rat`) where `movieid` is a movie id (as found in ratings.csv) and `rat` is the average rating of that movie id. (Hint: use aggregateByKey to compute first the sum of all ratings as well as the number of ratings per key).
2. Compute all pairs (`title`, `rat`) where `title` is a full movie title (as found in the movies.csv file), and `rat` is the average rating of that movie (computed over all possible ratings for that movie, as found in the ratings.csv file)
3. [_Extra_] Compute all pairs (`title`, `tag`) where `title` is a full movie title that has an average rating of at least 3.5, and `tag` is a tag for that movie (as found in the tags.csv file)

Extra: if you want to experiment with larger datasets, download the 10m dataset (http://files.grouplens.org/datasets/movielens/ml-10m.zip, 250 Mb uncompressed) and re-do the exercises above

2. Compute all pairs (`movieid`, `rat`) where `movieid` is a movie id (as found in ratings.csv) and `rat` is the average rating of that movie id. (Hint: use aggregateByKey to compute first the sum of all ratings as well as the number of ratings per key).

In [18]:
ratings_data = sc.textFile('data/movielens/ratings.csv')

# Parse the data into tuples of (movieId, rating)
ratings = ratings_data.map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[1]), float(tokens[2])))

# Compute the sum and count of ratings for each movieId
# The initial values for each key are (0.0, 0) where 0.0 is the sum of ratings and 0 is the count of ratings
sum_count = ratings.aggregateByKey((0.0, 0),
                                   lambda acc, value: (acc[0] + value, acc[1] + 1),
                                   lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))

# Calculate the average rating for each movieId
average_ratings = sum_count.mapValues(lambda x: x[0] / x[1])

# Collect the results and print sample pairs (movieId, average rating)
result = average_ratings.collect()

for movie_id, avg_rating in result[:5]:
    print(f"Movie ID: {movie_id}, Average Rating: {avg_rating}")

Movie ID: 6, Average Rating: 3.946078431372549
Movie ID: 50, Average Rating: 4.237745098039215
Movie ID: 70, Average Rating: 3.5090909090909093
Movie ID: 110, Average Rating: 4.031645569620253
Movie ID: 216, Average Rating: 3.326530612244898


3. Compute all pairs (title, rat) where title is a full movie title (as found in the movies.csv file), and rat is the average rating of that movie (computed over all possible ratings for that movie, as found in the ratings.csv file)

In [21]:
# Load the data
movies_data = sc.textFile('data/movielens/movies.csv')

# Parse the data into tuples of (movieId, rating)
ratings = ratings_data.map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[1]), float(tokens[2])))

# Parse the data into tuples of (movieId, title)
movies = movies_data.map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]), tokens[1]))

# Join ratings with movies data to get (movieId, (rating, title))
joined_data = ratings.join(movies)

# Use aggregateByKey to compute the sum of ratings and the count of ratings for each movieId
# The initial values for each key are (0.0, 0) where 0.0 is the sum of ratings and 0 is the count of ratings
sum_count = joined_data.aggregateByKey((0.0, 0),
                                       lambda acc, value: (acc[0] + value[0], acc[1] + 1),
                                       lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))

average_ratings = sum_count.mapValues(lambda x: x[0] / x[1])

# Collect the results and print sample pairs (title, average rating)
result = average_ratings.collect()[:5]

for movie_id, avg_rating in result:
    title = movies.lookup(movie_id)[0]
    rating = avg_rating
    print(f"Movie Title: {title}, Average Rating: {rating}")

Movie Title: Billy Madison (1995), Average Rating: 3.326530612244898
Movie Title: Star Wars: Episode IV - A New Hope (1977), Average Rating: 4.231075697211155
Movie Title: Pulp Fiction (1994), Average Rating: 4.197068403908795
Movie Title: Stargate (1994), Average Rating: 3.375
Movie Title: Forrest Gump (1994), Average Rating: 4.164133738601824


4. [_Extra_] Compute all pairs (title, tag) where title is a full movie title that has an average rating of at least 3.5, and tag is a tag for that movie (as found in the tags.csv file)

In [26]:
tags_data = sc.textFile('data/movielens/tags.csv')

# Load movies data
# movies_data = sc.textFile("movies.csv")

# Split each line of data into fields
ratings_rdd = ratings_data.map(lambda line: line.split(","))
tags_rdd = tags_data.map(lambda line: line.split(","))
movies_rdd = movies_data.map(lambda line: line.split(","))

# Filter ratings RDD to only include ratings >= 3.5
highly_rated_movies = ratings_rdd.map(lambda x: (x[1], (float(x[2]), 1))) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
    .filter(lambda x: x[1][0] / x[1][1] >= 3.5)

# Join highly_rated_movies with movies RDD
movie_title_ratings = highly_rated_movies.join(movies_rdd.map(lambda x: (x[0], x[1])))

# Convert tags RDD to (movieId, tag) format
tag_pairs = tags_rdd.map(lambda x: (x[1], x[2]))

# Join movie_title_ratings with tags RDD
result = movie_title_ratings.join(tag_pairs) \
    .map(lambda x: ((x[1][0][1], x[1][1]), 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda x: (x[0][0], x[0][1]))

# Display the results
result.take(5)

[('L.A. Confidential (1997)', 'Police'),
 ('"Jungle Book', 'Disney'),
 ('Requiem for a Dream (2000)', 'drug abuse'),
 ('Dead Again (1991)', 'memory'),
 ('Enemy at the Gates (2001)', 'sniper')]

## 3. Github log data exercises
Github makes activity logs publicly available at https://www.githubarchive.org/. One such log file, which contains activity data for 2015-03-01 between 0h-1h at night, has been downloaded and is available at `data/github/2015-03-01-0.json.gz`. This (compressed) file contains multiple JSON objects, one per line. Here is a sample line of this file, neatly formatted:

`{ "id": "2614896652",
    "type": "CreateEvent",
    "actor": {
        "id": 739622,
        "login": "treydock",
        "gravatar_id": "",
        "url": "https://api.githb.com/users/treydock",
        "avatar_url": "https://avatars.githubusercontent.com/u/739622?"
    },
    "repo": {
        "id": 23934080,
        "name": "Early-Modern-OCR/emop-dashboard",
    "url": "https://api.github.com/repos/Early-Modern-OCR/emop-dashboard"
    },
    "payload": {
        "ref": "development",
        "ref_type": "branch",
        "master-branch": "master",
        "description": "",
        "pusher_type": "user",
    },
    "public": true,
    "created_at": "2015-03-01T00:00:00Z",
    "org": {
        "id": 10965476,
        "login": "Early-Modern-OCR",
        "gravatar_id": "",
        "url": "https://api.github.com/orgs/Early-Modern-OCR",
        "avatar_url": "https://avatars.githubusercontent.com/u/10965476?"
    }
}`

This log entry has `CreateEvent` type and its `payload.ref_type` is `branch` . So someone named "treydock" (`actor.login`) created a repository branch called "development" (`payload.ref`) in the first second of March 1, 2015 (`created_at`) .

1. Load the textfile into an RDD (note: spark can read gzipped files directly!). Convert this RDD (which consists of string elements) to an RDD where each element is a JSON object (hint: use the `json.loads` function from the `json` module to convert a string into a JSON object).

2. Filter this RDD of JSON objects to retain only those objects that represent push activities (where `type` equals `PushEvent`)

3. Count the number of push events.

4. Compute the number of push events, grouped per `actor.login`.

5. Retrieve the results of (4) in sorted order, where logins with higher number of pushes come first. Retrieve the 10 first such results (which contain the highest number of pushes)

6. You are representing a company and need to retrieving the number of pushes for every employee in the company. The file `data/github/employees.txt` contains a list of all employee login names at your company.

Extra: if you want to experiment with larger datasets, download more log data from the github archive website and re-do the exercises above

1. Load the textfile into an RDD (note: spark can read gzipped files directly!). Convert this RDD (which consists of string elements) to an RDD where each element is a JSON object (hint: use the json.loads function from the json module to convert a string into a JSON object).

In [27]:
import json

# Load the gzipped JSON file directly into an RDD
file_path = "data/github/2015-03-01-0.json.gz"
json_rdd = sc.textFile(file_path).map(lambda line: json.loads(line))


# Print the first 5 JSON objects
for json_obj in json_rdd.take(5):
    print(json_obj)


{'id': '2614896652', 'type': 'CreateEvent', 'actor': {'id': 739622, 'login': 'treydock', 'gravatar_id': '', 'url': 'https://api.github.com/users/treydock', 'avatar_url': 'https://avatars.githubusercontent.com/u/739622?'}, 'repo': {'id': 23934080, 'name': 'Early-Modern-OCR/emop-dashboard', 'url': 'https://api.github.com/repos/Early-Modern-OCR/emop-dashboard'}, 'payload': {'ref': 'development', 'ref_type': 'branch', 'master_branch': 'master', 'description': '', 'pusher_type': 'user'}, 'public': True, 'created_at': '2015-03-01T00:00:00Z', 'org': {'id': 10965476, 'login': 'Early-Modern-OCR', 'gravatar_id': '', 'url': 'https://api.github.com/orgs/Early-Modern-OCR', 'avatar_url': 'https://avatars.githubusercontent.com/u/10965476?'}}
{'id': '2614896653', 'type': 'PushEvent', 'actor': {'id': 9063348, 'login': 'bezerrathm', 'gravatar_id': '', 'url': 'https://api.github.com/users/bezerrathm', 'avatar_url': 'https://avatars.githubusercontent.com/u/9063348?'}, 'repo': {'id': 31481156, 'name': 'bez

2. Filter this RDD of JSON objects to retain only those objects that represent push activities (where type equals PushEvent)


In [29]:
push_events_rdd = json_rdd.filter(lambda obj: obj.get("type") == "PushEvent")

# Print the first 5 PushEvent objects
for push_event in push_events_rdd.take(5):
    print(push_event)

{'id': '2614896653', 'type': 'PushEvent', 'actor': {'id': 9063348, 'login': 'bezerrathm', 'gravatar_id': '', 'url': 'https://api.github.com/users/bezerrathm', 'avatar_url': 'https://avatars.githubusercontent.com/u/9063348?'}, 'repo': {'id': 31481156, 'name': 'bezerrathm/HuffmanCoding', 'url': 'https://api.github.com/repos/bezerrathm/HuffmanCoding'}, 'payload': {'push_id': 588068425, 'size': 1, 'distinct_size': 1, 'ref': 'refs/heads/master', 'head': '570ad890d78525dfc10364901c41b8236e2c783a', 'before': '6dda286a3a1c254184f1456b5fefc139ff9dce66', 'commits': [{'sha': '570ad890d78525dfc10364901c41b8236e2c783a', 'author': {'email': 'ba1618c3d509021c3c759fa9aad031c4a38fe046@gmail.com', 'name': 'Thiago Henrique Menêses Bezerra'}, 'message': 'Create other.h', 'distinct': True, 'url': 'https://api.github.com/repos/bezerrathm/HuffmanCoding/commits/570ad890d78525dfc10364901c41b8236e2c783a'}]}, 'public': True, 'created_at': '2015-03-01T00:00:00Z'}
{'id': '2614896654', 'type': 'PushEvent', 'actor':

3. Count the number of push events.

In [30]:
push_events_count = push_events_rdd.count()

print(f"Number of Push Events: {push_events_count}")

Number of Push Events: 8793


4. Compute the number of push events, grouped per actor.login.

In [31]:
# Group by actor.login and count the number of push events per actor
push_events_per_actor = push_events_rdd.groupBy(lambda obj: obj["actor"]["login"]).mapValues(len)

# Print 5 actors with their push events count
for actor, count in push_events_per_actor.take(5):
    print(f"Actor: {actor}, Push Events Count: {count}")

Actor: bezerrathm, Push Events Count: 2
Actor: demianborba, Push Events Count: 3
Actor: ricardocastaneda, Push Events Count: 3
Actor: ex3ndr, Push Events Count: 1
Actor: furutachi, Push Events Count: 1


5. Retrieve the results of (4) in sorted order, where logins with higher number of pushes come first. Retrieve the 10 first such results (which contain the highest number of pushes)

In [32]:
# Sort the results by the count of push events in descending order
sorted_results = push_events_per_actor.sortBy(lambda x: x[1], ascending=False)

# Take the top 10 actors with the highest number of pushes
top10_actors = sorted_results.take(10)

# Print the top 10 actors with their push events count
for actor, count in top10_actors:
    print(f"Actor: {actor}, Push Events Count: {count}")

Actor: greatfirebot, Push Events Count: 192
Actor: diversify-exp-user, Push Events Count: 146
Actor: KenanSulayman, Push Events Count: 72
Actor: manuelrp07, Push Events Count: 45
Actor: mirror-updates, Push Events Count: 42
Actor: tryton-mirror, Push Events Count: 37
Actor: Somasis, Push Events Count: 26
Actor: direwolf-github, Push Events Count: 24
Actor: EmanueleMinotto, Push Events Count: 22
Actor: hansliu, Push Events Count: 21


6. You are representing a company and need to retrieving the number of pushes for every employee in the company. The file data/github/employees.txt contains a list of all employee login names at your company.

In [34]:
# Load the list of employee login names
employees_file_path = "data/github/employees.txt"

employees_set = set(sc.textFile(employees_file_path).collect())

# Filter the RDD to retain only PushEvent objects for employees in the list
push_events_rdd = json_rdd.filter(lambda obj: obj.get("type") == "PushEvent" and obj["actor"]["login"] in employees_set)

# Group by actor.login and count the number of push events per employee
push_events_per_employee = push_events_rdd.groupBy(lambda obj: obj["actor"]["login"]).mapValues(len)

for employee, count in push_events_per_employee.collect()[:5]:
    print(f"Employee: {employee}, Push Events Count: {count}")

Employee: ricardocastaneda, Push Events Count: 3
Employee: kriskd, Push Events Count: 1
Employee: jmarkkula, Push Events Count: 18
Employee: qeremy, Push Events Count: 16
Employee: BatMiles, Push Events Count: 10
