# Setting up Pyspark

Installing Java

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Installing Apache Spark and Hadoop:

In [2]:
!wget https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz
!rm spark-3.5.3-bin-hadoop3.tgz   # Tidying up

--2025-03-22 14:03:19--  https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400864419 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.3-bin-hadoop3.tgz’


2025-03-22 14:03:40 (18.4 MB/s) - ‘spark-3.5.3-bin-hadoop3.tgz’ saved [400864419/400864419]



Setting up environmental variables:

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

Install the findpark

In [4]:
!pip install -q findspark
import findspark
findspark.init()

Create the entry point to Spark.

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "10g").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) #  This will format our output tables a bit nicer when not using the show() method
spark

Creating the SparkContext

In [6]:
sc = spark.sparkContext

In [None]:
# print(spark.sparkContext._conf.set("spark.driver.memory", '4g'))
# print(spark.sparkContext._conf.get("spark.driver.memory"))

# Importing our datasets

Books.csv

In [7]:
import kagglehub
path = kagglehub.dataset_download("trnnhtminh/goodread-books-ratings")
print("Path to dataset files:", path)
!mv {path}/merged_books_ratings.csv .
!mv merged_books_ratings.csv Books.csv
!ls

Downloading from https://www.kaggle.com/api/v1/datasets/download/trnnhtminh/goodread-books-ratings?dataset_version_number=1...


100%|██████████| 1.48G/1.48G [00:17<00:00, 92.3MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/trnnhtminh/goodread-books-ratings/versions/1
Books.csv  sample_data	spark-3.5.3-bin-hadoop3


Genres.csv. Because that file is created halfway through the data processing, it has to be uploaded manually. Below are two options: from google drive or a local directory.

In [8]:
# from google drive. If this doesn't work, check that the file has access set to "anyone with the link", and that the id (after /d/ in the link to the file) is correct
!gdown --id 15kIXt9psvX12bvFbzg8WST6k8WaO_pYt
!gdown --id 1PAQKjrUUei5DiOEqoRpJw3t0hoTFTtpf

Downloading...
From: https://drive.google.com/uc?id=15kIXt9psvX12bvFbzg8WST6k8WaO_pYt
To: /content/Genres.csv
100% 874k/874k [00:00<00:00, 104MB/s]
Downloading...
From (original): https://drive.google.com/uc?id=1PAQKjrUUei5DiOEqoRpJw3t0hoTFTtpf
From (redirected): https://drive.google.com/uc?id=1PAQKjrUUei5DiOEqoRpJw3t0hoTFTtpf&confirm=t&uuid=4c1c10b8-9355-4339-a697-60e55746dddc
To: /content/Ratings.csv
100% 527M/527M [00:09<00:00, 56.1MB/s]


In [None]:
# from a local directory
# from google.colab import files
# uploaded = files.upload()

In [9]:
books = sc.textFile("Books.csv")
genres = sc.textFile("Genres.csv")
ratings = sc.textFile("Ratings.csv")

In [None]:
books.take(1)

['user_id,isbn,rating,authors,description,genres,pages,title,publishYear,publishMonth,publishDay']

In [None]:
genres.take(1)

['isbn,genre']

In [10]:
ratings.take(1)

['1,classic,3']

# MapReduce 1: Join

In [12]:
import time

In [16]:
def isbn_books(row):
  fields = row.split(",")
  return (fields[1], fields[2])

def isbn_genres(row):
  fields = row.split(",")
  return (fields[0], fields[1])

books_part = books.map(isbn_books)
genres_part = genres.map(isbn_genres)

books_part = books_part.partitionBy(8)
genres_part = genres_part.partitionBy(8)

join_rdd = books_part.join(genres_part) \
                .map(lambda pairs: pairs[1])
start = time.perf_counter()
join_output = join_rdd.collect()
end = time.perf_counter()
print("Execution time: " + str(end - start))

Execution time: 129.90709262399923


In [18]:
for i in range(0, 20):
  print(join_output[i])

('5', 'classic')
('5', 'fiction')
('5', 'historical-fiction')
('5', 'young-adult')
('3', 'classic')
('3', 'fiction')
('3', 'historical-fiction')
('3', 'young-adult')
('4', 'classic')
('4', 'fiction')
('4', 'historical-fiction')
('4', 'young-adult')
('3', 'classic')
('3', 'fiction')
('3', 'historical-fiction')
('3', 'young-adult')
('5', 'classic')
('5', 'fiction')
('5', 'historical-fiction')
('5', 'young-adult')


# MapReduce 2: Group By

In [23]:
ratings_rdd = ratings.map(lambda row: (row.split(",")[1], row.split(",")[2])) \
                     .reduceByKey(lambda x, y: int(x)+int(y))
start = time.perf_counter()
ratings_output = ratings_rdd.collect()
end = time.perf_counter()

In [20]:
for i in ratings_output:
  print(i)

('fantasy', 9885284)
('horror', 1996196)
('philosophy', 1176248)
('spirituality', 469554)
('music', 166195)
('young-adult', 7507825)
('psychology', 801236)
('religion', 750382)
('manga', 116093)
('science-fiction', 3638978)
('chick-lit', 2117020)
('contemporary', 5370340)
('fiction', 19869613)
('historical-fiction', 5210228)
('romance', 6331677)
('crime', 2705488)
('christian', 443905)
('travel', 419692)
('comic', 531088)
('graphic-novels', 539440)
('biography', 1415946)
('business', 330201)
('history', 1340160)
('science', 636070)
('cookbook', 69113)
('humor-and-comedy', 3559)
('mystery', 4622843)
('book', 1070730)
('self-help', 554385)
('paranormal', 2588188)
('suspense', 2548798)
('memoir', 1203974)
('art', 231543)
('classic', 8557100)
('poetry', 810084)
('thriller', 3376067)
('nonfiction', 2329726)
('sport', 115297)
('gay-and-lesbian', 610)


In [24]:
print("Execution time: " + str(end - start))

Execution time: 72.9229081980011


# Stop Spark

In [25]:
# Stopping Spark:
sc.stop()
spark.stop()