# Lab 11 Spark

Author: ISTD, SUTD

Title: Lab 11, Spark part 1

Date: March 5, 2025

## Learning outcome


By the end of this lesson, you are able to

* Submit PySpark jobs to a Spark cluster
* Paralelize data processing using PySpark


You can either execute this lab directly on the aws cluster with HDFS file system, or you can install PySpark in Google Colab and load the files locally. The main difference in coding is that we do not load the context from the HDFS filesystem, but instead just load a local file. Other than than that, all PySpark commands are the same.

To run this lab, you can make a copy of this notebook or `File -> Open in Playground Mode`.

## Installing PySpark in Google Colab

To install PySpark in Google Collab, execute the below cell. This will download Spark and install all necessary libraries for this lab.

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F


Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:10 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [73.0 kB]
Hit:11 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Get:12 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,697 kB]
Get:13 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,788 kB

## Wordcount Example

Let us first download the necessary data file. We can find it at `https://raw.githubusercontent.com/istd50043-2023-spring/cohort_problems/main/cc11/ex1/data.csv`.

Colab lets us execute unix commands, as long as we prepend them with `!`. So let's download the file and move it into a new folder called `input`. While we are at it, let's create a folder called `output` as well.

In [2]:
!wget https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc10/data/TheCompleteSherlockHolmes.txt
!mkdir input
!mv TheCompleteSherlockHolmes.txt input/
!mkdir output

--2025-04-15 08:07:58--  https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc10/data/TheCompleteSherlockHolmes.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3705628 (3.5M) [text/plain]
Saving to: ‘TheCompleteSherlockHolmes.txt’


2025-04-15 08:07:59 (10.9 MB/s) - ‘TheCompleteSherlockHolmes.txt’ saved [3705628/3705628]



You can check that the data.csv file downloaded by uncollapsing the left panel and checking the folder contents.

Now we are ready to write our PySpark code. The goal is to write a simple wordcounter:

In [33]:
import sys
from pyspark import SparkContext, SparkConf

# sc.stop() # uncomment this during debugging to restart your context in case execution stopped mid-way this cell.

conf = SparkConf().setAppName("Wordcount Application")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# note that we load the text file directly with a local path instead of providing an hdfs url
input_file_name = 'input/TheCompleteSherlockHolmes.txt'
text_file = sc.textFile(input_file_name)

counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

output_folder = './output/word_count'
counts.saveAsTextFile(output_folder)

sc.stop()

## Exercise 1

Write a PySpark application which takes a (set of) Comma-seperated-value (CSV) file(s) with 2 columns and output a CSV file with first two columns same as the input file, and the third column contains the values obtained by splitting the first column using the second column as delimiter.

The input file can be found here: `https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex1/data.csv`.

For example, given input from a file:

```
50000.0#0#0#,#
0@1000.0@,@
1$,$
1000.00^Test_string,^
```


the program should output

```
50000.0#0#0#,#,['50000.0', '0', '0']
0@1000.0@,@,['0', '1000.0', '']
1$,$,['1', '']
1000.00^Test_string,^,['1000.00', 'Test_string']
```

and write it to a file.



In [31]:
!wget https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex1/data.csv
!mv data.csv input/data1.csv

--2025-04-15 08:33:07--  https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex1/data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 54 [text/plain]
Saving to: ‘data.csv’


2025-04-15 08:33:07 (930 KB/s) - ‘data.csv’ saved [54/54]



In [29]:
import sys
from pyspark import SparkContext, SparkConf

# sc.stop() # uncomment this during debugging to restart your context in case execution stopped mid-way this cell.

conf = SparkConf().setAppName("Exercise 1")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# note that we load the text file directly with a local path instead of providing an hdfs url
input_file_name = 'input/data1.csv'
text_file = sc.textFile(input_file_name)

def foreach(record: str) -> str:
    cols = record.split(',')
    if len(cols) > 1:
        cols.append(str(cols[0].split(cols[1])))
    return ','.join(cols)

result = text_file.map(foreach)

output_folder = './output/ex1'
result.saveAsTextFile(output_folder)

sc.stop()

## Exercise 2

Write PySpark application which aggregates (counts) a (set of) CSV file(s) with 4 columns based on its third column, the destination IP.

The input file can be found here: `https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex2/data.csv`

Given input

```
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
```
the program should output

```
 10.0.0.3.5001,13
 10.0.0.2.54880,7
```

In [30]:
!wget https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex2/data.csv
!mv data.csv input/data2.csv

--2025-04-15 08:32:57--  https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex2/data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1000 [text/plain]
Saving to: ‘data.csv’


2025-04-15 08:32:57 (18.8 MB/s) - ‘data.csv’ saved [1000/1000]



In [38]:
import sys
from pyspark import SparkContext, SparkConf

# sc.stop() # uncomment this during debugging to restart your context in case execution stopped mid-way this cell.

conf = SparkConf().setAppName("Exercise 2")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# note that we load the text file directly with a local path instead of providing an hdfs url
input_file_name = 'input/data2.csv'
text_file = sc.textFile(input_file_name)

def foreach(record: str) -> tuple[str, int]:
    cols = record.split(', ')
    return (cols[2], 1) if len(cols) > 2 else ('', 0)

result = text_file.map(foreach).reduceByKey(lambda a, b: a + b).map(lambda row: ','.join(map(str, row)))

output_folder = './output/ex2'
result.saveAsTextFile(output_folder)

sc.stop()

## Exercise 3

Given the same input as Exercise 2, write a PySpark application which outputs the following:

```
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
```


In the event the input is very huge with too many unique destination IP values, can your program scale?


The questions were adopted from `https://jaceklaskowski.github.io/spark-workshop/exercises/`


In [39]:
import sys
from pyspark import SparkContext, SparkConf

# sc.stop() # uncomment this during debugging to restart your context in case execution stopped mid-way this cell.

conf = SparkConf().setAppName("Exercise 3")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# note that we load the text file directly with a local path instead of providing an hdfs url
input_file_name = 'input/data2.csv'
text_file = sc.textFile(input_file_name)

def foreach(record: str) -> tuple[str, str]:
    cols = record.split(', ')
    return (cols[2], 1) if len(cols) > 2 else ('', 0)

counts = text_file.map(foreach).reduceByKey(lambda a, b: a + b).collectAsMap()

def append(record: str) -> str:
    cols = record.split(', ')
    return record + f', {str(counts[cols[2]]) if len(cols) > 2 else "1"}'

result = text_file.map(append)

output_folder = './output/ex3'
result.saveAsTextFile(output_folder)

sc.stop()