<span style="font-size:24px;">Introduction to Apache Spark and Lab Overview<span>

<span style="font-size:16px;">
Apache Spark is an open-source, distributed computing system designed for processing large-scale data quickly and efficiently. It provides an intuitive programming model for working with structured and unstructured data, enabling users to perform transformations and actions on datasets using high-level APIs. Spark's key features include:
<span>

#
- <span style="font-size:16px;">Speed: Spark processes data in memory, making it significantly faster than traditional disk-based frameworks like Hadoop MapReduce.<span>
- <span style="font-size:16px;">Scalability: It can scale seamlessly from a single machine to thousands of nodes in a cluster.<span>
- <span style="font-size:16px;">Versatility: Spark supports multiple workloads, including batch processing, interactive querying, streaming, machine learning, and graph processing.<span>

<span style="font-size:16px;">
At the core of Spark lies the Resilient Distributed Dataset (RDD), a fault-tolerant and immutable data abstraction that enables distributed data processing. Transformations on RDDs are executed lazily, building a lineage graph that allows efficient fault recovery.
<span>

<span style="font-size:24px;">About the Lab<span>

<span style="font-size:16px;">
This lab introduces key concepts and applications of Apache Spark using PySpark, Spark's Python API.<span>

#
1. <span style="font-size:16px;">Basic Operations: <span>
    - <span style="font-size:16px;">Working with RDDs: Creation, transformations, and actions.<span>
    - <span style="font-size:16px;">Common transformations like map, flatMap, and groupByKey.<span>
    - <span style="font-size:16px;">Joining and aggregating datasets.<span>
2. <span style="font-size:16px;">Spark optimisations:<span>
    - <span style="font-size:16px;">Lineage tracking<span>
    - <span style="font-size:16px;">Lazy evaluation.<span>
    - <span style="font-size:16px;">Caching<span>
    - <span style="font-size:16px;">Checkpointing<span>
3. <span style="font-size:16px;">Spark UI<span>
4. <span style="font-size:16px;">Data Analytics with spark<span>
<span>

<span style="font-size:24px;">Installing pyspark<span>

<span style="font-size:16px;"> Ensure that you have python>=3.8 and JDK>=8.<span>

<span style="font-size:16px;">We can start by first installing executing the following command in your terminal/command prompt:<span>

<span style="font-size:20px;">pip install pyspark<span>

<span style="font-size:16px;">The same can be achieved by executing the following codeblock:<span>

In [None]:
!pip install pyspark

<span style="font-size:16px;">We can verify that the installation by running the given codeblock.<span>

In [1]:
import pyspark
print(pyspark.__version__)

3.5.3


<span style="font-size:16px;">It is also recommended to install "findspark" since we are using a Jupyter Notebook.
This can be done by running the following command in the terminal.<span>

<span style="font-size:20px;">pip install findspark<span>

<span style="font-size:16px;">This can also be done by executing the given codeblock.<span>

In [None]:
%pip install findspark

<span style="font-size:16px;">The next step is to import and initialize findspark.<span>

In [1]:
import findspark
findspark.init()

<span style="font-size:16px;"> Now that the setup is complete, we can proceed to initialize a Spark context. The Spark context serves as the entry point to interact with Spark’s core functionalities, allowing us to create and manipulate RDDs, perform transformations, and execute actions across a cluster. </span>

In [1]:
from pyspark import SparkContext

# Initialize Spark
sc = SparkContext("local","SparkLab")
sc.setCheckpointDir('checkpoint')

24/12/03 09:26:06 WARN Utils: Your hostname, Anshiks-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.194.12.61 instead (on interface en0)
24/12/03 09:26:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/03 09:26:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<span style="font-size:16px;"> When initializing the Spark context for the first time, you may encounter some warnings displaying the address and port number. Make sure to note these details, as they will be useful later for accessing SparkUI. By default, the address is set to 'localhost' and port to 4000.
Since pyspark only allows to run one context at a time, you need to stop an older context before creating a new one. This can be done by the stop() function. </span>

In [70]:
# Dont run this cell now
sc.stop()

<span style="font-size:16px;">Once your Spark context is set up, you can begin creating Resilient Distributed Datasets (RDDs). Pyspark has multiple methods to convert different datatypes into rdd.<span>

In [2]:
# Create an RDD from a Python list
data = [1, 2, 3, 4, 5]
rdd_list = sc.parallelize(data)

# Verify RDD content
print(rdd_list.collect())  # Output: [1, 2, 3, 4, 5]

[1, 2, 3, 4, 5]


                                                                                

In [3]:
# Path to the file
file_path = "sample.txt"

# Create a sample file
with open(file_path, "w") as file:
    file.write("Hello, World!\n")
    file.write("This is a sample file")

# Load the file into an RDD
rdd_file = sc.textFile(file_path)

# Verify RDD content
print(rdd_file.collect())  # Output: ['Hello, World!', 'This is a sample file']

['Hello, World!', 'This is a sample file']


<span style="font-size:16px;"> The collect() action retrieves the entire RDD as a list to the driver. This is useful for small datasets or debugging.<span>

<span style="font-size:16px;"> We can apply various map and reduce type transformations to these rdd. Examples of some have been given below.<span>

In [19]:
# Apply map transformation to square each element
squared_rdd = rdd_list.map(lambda x: x ** 2)

# Collect and print the result
print(squared_rdd.collect())  # Output: [1, 4, 9, 16, 25]

#------------------------------------------------------------------------------------------------------------

# Apply filter transformation to select even numbers
even_rdd = rdd_list.filter(lambda x: x % 2 == 0)

# Collect and print the result
print(even_rdd.collect())  # Output: [2, 4]

#------------------------------------------------------------------------------------------------------------

# Apply flatMap transformation to split words in a sentence
sentences = ["Spark is great", "RDDs are powerful"]
sentences_rdd = sc.parallelize(sentences)

# FlatMap to split sentences into words
words_rdd = sentences_rdd.flatMap(lambda sentence: sentence.split(" "))

# Collect and print the result
print(words_rdd.collect())  # Output: ['Spark', 'is', 'great', 'RDDs', 'are', 'powerful']

#------------------------------------------------------------------------------------------------------------

# Apply reduce to calculate the sum of the elements
sum_result = rdd_list.reduce(lambda x, y: x + y)

# Print the result
print(sum_result)  # Output: 15

#------------------------------------------------------------------------------------------------------------

# Count the number of elements
print(rdd_list.count())  # Output: 5

#------------------------------------------------------------------------------------------------------------

# Take the first 3 elements
print(rdd_list.take(3))  # Output: [1, 2, 3]

#------------------------------------------------------------------------------------------------------------

# Group elements by their parity (even or odd)
grouped_rdd = rdd_list.groupBy(lambda x: 'even' if x % 2 == 0 else 'odd')

# Collect and print the result
print(grouped_rdd.collect())
# Output: [('odd', [1, 3, 5]), ('even', [2, 4])] or [('even', iter([2, 4])), ('odd', iter([1, 3, 5]))]

#------------------------------------------------------------------------------------------------------------

# Create two RDDs
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])

# Perform union
union_rdd = rdd1.union(rdd2)

# Collect and print the result
print(union_rdd.collect())  # Output: [1, 2, 3, 4, 5, 6]

#------------------------------------------------------------------------------------------------------------

# Create an RDD with duplicates
rdd_with_duplicates = sc.parallelize([1, 2, 2, 3, 3, 4])

# Apply distinct transformation
distinct_rdd = rdd_with_duplicates.distinct()

# Collect and print the result
print(distinct_rdd.collect())  # Output: [1, 2, 3, 4]

#------------------------------------------------------------------------------------------------------------

# Create an RDD of key-value pairs
pair_rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4)])

# Apply reduceByKey to sum the values with the same key
reduced_rdd = pair_rdd.reduceByKey(lambda x, y: x + y)

# Collect and print the result
print(reduced_rdd.collect())  # Output: [('a', 4), ('b', 6)]

#------------------------------------------------------------------------------------------------------------

# Create an RDD of key-value pairs
pair_rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])

# Apply mapValues to increment each value by 1
mapped_values_rdd = pair_rdd.mapValues(lambda x: x + 1)

# Collect and print the result
print(mapped_values_rdd.collect())  # Output: [('a', 2), ('b', 3), ('c', 4)]

#------------------------------------------------------------------------------------------------------------

# Sample 50% of the RDD elements
sampled_rdd = rdd_list.sample(withReplacement=False, fraction=0.5)

# Collect and print the result
print(sampled_rdd.collect())

#------------------------------------------------------------------------------------------------------------

key_value_rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 4)])

# Apply mapValues to increment each value by 1
mapped_values_rdd = key_value_rdd.mapValues(lambda x: x + 1)

# Collect and print the result
print(mapped_values_rdd.collect())  # Output: [('a', 2), ('b', 3), ('c', 4), ('a', 5)]

[1, 4, 9, 16, 25]
[2, 4]
['Spark', 'is', 'great', 'RDDs', 'are', 'powerful']
15
5
[1, 2, 3]
[('odd', <pyspark.resultiterable.ResultIterable object at 0x12007f9d0>), ('even', <pyspark.resultiterable.ResultIterable object at 0x1200f7bd0>)]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4]
[('a', 4), ('b', 6)]
[('a', 2), ('b', 3), ('c', 4)]
[1, 3, 4]
[('a', 2), ('b', 3), ('c', 4), ('a', 5)]


<span style="font-size:16px;"> One of the flagship features of RDD is lineage tracking. We can see the lineage of a RDD by using the toDebugString() function. We can also observe the break in the lineage by creating a checkpoint using the checkpoint() function.<span>

In [5]:
rdd_first = sc.parallelize([1, 2, 3])
rdd_second = rdd_first.map(lambda x: x * 2)

print("RDD Lineage:", rdd_second.toDebugString())

rdd_first.checkpoint()
print(rdd_first.collect())

print(rdd_first.isCheckpointed())

print("RDD Lineage after checkpoint:", rdd_second.toDebugString())

RDD Lineage: b'(1) PythonRDD[34] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[33] at readRDDFromFile at PythonRDD.scala:289 []'
[1, 2, 3]
True
RDD Lineage after checkpoint: b'(1) PythonRDD[34] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[33] at readRDDFromFile at PythonRDD.scala:289 []\n |  ReliableCheckpointRDD[35] at collect at /var/folders/58/64zqz_d92v58h90mtvz109r80000gn/T/ipykernel_90709/2310518397.py:7 []'


<span style="font-size:16px;"> To improve performance, spark adopts a lazy execution policy. We can observe this in the following code.<span>

In [6]:
import time

def square(x):
    call_time = time.time()
    print(f"Called with {x} at {call_time}")
    return x ** 2

squared_rdd = rdd_list.map(square)
exec_time = time.time()
print("Execution time:", exec_time)

print(squared_rdd.collect())

Execution time: 1733190195.251616
[1, 4, 9, 16, 25]


Called with 1 at 1733190195.457449
Called with 2 at 1733190195.45758
Called with 3 at 1733190195.4575841
Called with 4 at 1733190195.4575958
Called with 5 at 1733190195.457599


<span style="font-size:16px;"> Another optimization you can use is caching the RDD. A stronger version of this is persist() which can be used to save the Rdd to the disk.<span>

In [7]:
# Without cache
start = time.time()
print("Sum Without Cache:", rdd_list.reduce(lambda x, y: x + y))
print("Time Without Cache:", time.time() - start)

# With cache
rdd_list.cache()
start = time.time()
print("Sum With Cache:", rdd_list.reduce(lambda x, y: x + y))
print("Time With Cache:", time.time() - start)

Sum Without Cache: 15
Time Without Cache: 0.12427783012390137
Sum With Cache: 15
Time With Cache: 0.12604618072509766


<span style="font-size:24px;">Spark UI Overview<span>

<span style="font-size:16px;">
By default, Spark UI is accessible at http://<driver-host>:4040 while the application is running. If a different address or port is used, pyspark notifies the user during context creation.

Key Features:<span>

#
- <span style="font-size:16px;">Job Tab<span>
    - <span style="font-size:16px;">Lists all the jobs in the application with details such as:<span>
        - <span style="font-size:16px;">Job ID: A unique identifier for each job.<span>
        - <span style="font-size:16px;">Status: Indicates whether a job is completed, running, or failed.<span>
        - <span style="font-size:16px;">Duration: The time taken for the job to execute.<span>
        - <span style="font-size:16px;">Stages: Shows the stages involved in the job.<span>
    - <span style="font-size:16px;">Provides a visualization of the DAG (Directed Acyclic Graph) of stages and their dependencies.<span>

- <span style="font-size:16px;">Stages Tab<span>
    - <span style="font-size:16px;">Displays detailed information about each stage of the application:<span>
        - <span style="font-size:16px;">Stage ID: Unique identifier for each stage.<span>
        - <span style="font-size:16px;">Task Summary: Number of tasks, their status (e.g., succeeded, failed, or pending).<span>
        - <span style="font-size:16px;">Input/Output Data: Data read/written during the stage.<span>
        - <span style="font-size:16px;">Shuffle Read/Write: Details about shuffle operations.<span>
    - <span style="font-size:16px;">Aggregated Metrics: Task execution times, shuffle data, and resource usage.<span>

- <span style="font-size:16px;">Storage Tab<span>
    - <span style="font-size:16px;">Lists all the RDDs or datasets currently cached or persisted.<span>
    - <span style="font-size:16px;">Provides details such as:<span>
        - <span style="font-size:16px;">RDD ID: Unique identifier for each RDD.<span>
        - <span style="font-size:16px;">Storage Level: Type of storage used (e.g., MEMORY_ONLY, DISK_ONLY).<span>
        - <span style="font-size:16px;">Partitions: Number of partitions and their sizes.<span>
        - <span style="font-size:16px;">Memory/Disk Usage: Amount of data stored in memory or on disk.<span>
<span>

<span style="font-size:24px;">Analytics using Spark<span>

<span style="font-size:16px;"> 
You are given csv file containing the website logs for the quiz you took. Using these, you have to identify all the cheaters.

The csv file has the following columns:
1. 'Time': The timestamp in the format "%d/%m/%y; %H:%M:%S".
2. 'UserId': The unique Id of the user.
3. 'Event context': The current screen/webpage visible to the user.
4. 'Component': Type of interface visible to the user.
5. 'Event name': The action performed by the user.
6. 'Description': A detailed description of the action.

('Event context' column is a refinement of the 'Component' column.)

You can conclude someone is a cheater if they answer a question significantly quicker than the average time for that problem. For generalization, Someone is a cheater if first response time is less than 1/5 times the average for that problem.

A suggested roadmap for the problem is:
- Load the file into a Rdd.
- Convert this Rdd to a Rdd with suitable format like lists or dictionaries.
- Remove all logs not related to the quiz.(Hint: Apply a filter on the 'Component' column.)
- Group the logs for each user.
- For each user, group the logs by the problem type.
- For each problem, find when it was first viewed and submitted.
- Calculate the time taken for each problem by each user. 
- Calculate the average time taken for each problem.
- Compare these to every user and identify the cheaters.

You can also try computing the following:
- Which users did not attempt the quiz.
- Who was the Early Bird.( Which user was the first to attempt the quiz.)
- Who completed all the questions the fastest.
- What was the maximum and minimum time taken for each problem.
- For each problem, who solved it the quickest.
<span>