# Exercise 1: Spark Setup and RDD Basics


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Spark Core Concepts").master("local[*]").getOrCreate()  
print("Spark Session Created Successfully!")
print("Spark Version:", spark.version)  # Check Spark version



Spark Session Created Successfully!
Spark Version: 3.3.2


** Create an RDD: Parallelize a small Python collection (e.g., a list of 8–10 numbers or strings) into an RDD using spark.sparkContext.parallelize(). Print out the number of partitions of the RDD (use RDD.getNumPartitions()) to see how the data is split.**

In [0]:
data = [10,20,30,40,50,60,70,80,90,100]
text = ['python', 'scala', 'sql', 'databricks', 'spark', 'aws', 'azure', 'machine learning']
rdd_data = spark.sparkContext.parallelize(data)
rdd_text = spark.sparkContext.parallelize(text)

data_partitions = rdd_data.getNumPartitions()
text_partitions = rdd_text.getNumPartitions()

print(f"Number of partitions for numerical data is: {data_partitions}")
print(f"Number of partitions for text data is: {text_partitions}")

Number of partitions for numerical data is: 8
Number of partitions for text data is: 8


Apply a Transformation: Use an RDD transformation (such as map) on the RDD to create a new RDD. For example, if your RDD contains numbers, apply a function to square each number. (Recall that transformations create a new dataset from an existing one and are lazy, meaning they don’t execute immediately)

In [0]:
# Transformations on Numerical RDD
squared_rdd = rdd_data.map(lambda x: x**2)
filtered_rdd = rdd_data.filter(lambda x: x >50)

# Actions on Numerical RDD
print("Squared Data RDD:", squared_rdd.collect())  
print("Filtered Data RDD ( > 50):", filtered_rdd.collect())
print("Count of elements in Data RDD:", rdd_data.count()) 
print("First 3 elements:", rdd_data.take(3))  

# Transformations on Text RDD
upper_rdd = rdd_text.map(lambda x : x.upper())
filtered_text_rdd = rdd_text.filter(lambda x : 'a' in x.lower())

# Actions on Text RDD
print("\nOriginal Text RDD:", rdd_text.collect())
print("Uppercase Text RDD:", upper_rdd.collect())
print("Filtered Text RDD (Words containing 'a'):", filtered_text_rdd.collect())
print("Count of words in Text RDD:", rdd_text.count())
print("First 2 words:", rdd_text.take(2))

Squared Data RDD: [100, 400, 900, 1600, 2500, 3600, 4900, 6400, 8100, 10000]
Filtered Data RDD ( > 50): [60, 70, 80, 90, 100]
Count of elements in Data RDD: 10
First 3 elements: [10, 20, 30]

Original Text RDD: ['python', 'scala', 'sql', 'databricks', 'spark', 'aws', 'azure', 'machine learning']
Uppercase Text RDD: ['PYTHON', 'SCALA', 'SQL', 'DATABRICKS', 'SPARK', 'AWS', 'AZURE', 'MACHINE LEARNING']
Filtered Text RDD (Words containing 'a'): ['scala', 'databricks', 'spark', 'aws', 'azure', 'machine learning']
Count of words in Text RDD: 8
First 2 words: ['python', 'scala']


DataFrame Creation: Load a structured dataset into a DataFrame. You can use a provided CSV or JSON file (e.g., employees.csv with columns like id, name, department, salary), or create a small Pandas DataFrame and convert it to Spark DataFrame. Use spark.read (for CSV/JSON) or spark.createDataFrame (for existing data) to create the DataFrame.

In [0]:
import pandas as pd
# Start Spark Session
spark = SparkSession.builder.appName("DataFrame Creation").getOrCreate()

# Create a Pandas DataFrame
data = {
    "id": [1, 2, 3, 4, 2, 3,3],
    "name": ["John Doe", "Jane Smith", "Robert Brown", "Emily Davis", "Robert Brown","Emily Davis","Robert Brown"],
    "department": ["Engineering", "HR", "Finance", "Marketing","HR", "Finance","HR"],
    "salary": [80000, 75000, 90000, 70000, 75000, 90000, 70000]
}

pdf = pd.DataFrame(data)  # Create Pandas DataFrame

# Convert Pandas DataFrame to Spark DataFrame
df_spark = spark.createDataFrame(pdf)

# Show Spark DataFrame
df_spark.show()

+---+------------+-----------+------+
| id|        name| department|salary|
+---+------------+-----------+------+
|  1|    John Doe|Engineering| 80000|
|  2|  Jane Smith|         HR| 75000|
|  3|Robert Brown|    Finance| 90000|
|  4| Emily Davis|  Marketing| 70000|
|  2|Robert Brown|         HR| 75000|
|  3| Emily Davis|    Finance| 90000|
|  3|Robert Brown|         HR| 70000|
+---+------------+-----------+------+



 Inspect the DataFrame: Use DataFrame actions like printSchema() to display the schema and show(5) to display the first few rows. Verify that the data is loaded as expected.

In [0]:
df_spark.printSchema()
df_spark.show(5)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+---+------------+-----------+------+
| id|        name| department|salary|
+---+------------+-----------+------+
|  1|    John Doe|Engineering| 80000|
|  2|  Jane Smith|         HR| 75000|
|  3|Robert Brown|    Finance| 90000|
|  4| Emily Davis|  Marketing| 70000|
|  2|Robert Brown|         HR| 75000|
+---+------------+-----------+------+
only showing top 5 rows



DataFrame Transformation: Use DataFrame transformations to filter and transform the data. For example:
• Filter the DataFrame to only include rows that meet a condition (e.g., salary > 50000).
• Select specific columns or derive a new column (for instance, add 10% bonus to salary as a new column).
• Perform an aggregation using groupBy and agg (e.g., group by department and compute average salary).
Note that, similar to RDD transformations, DataFrame transformations are also lazy – they define a new query plan but do not execute until an action is invoked
4. DataFrame Action: Invoke an action to materialize the results of your transformations. For example, use show() to display the aggregated results, or count() to count the filtered records. This will trigger the execution of the transformations.

In [0]:
from pyspark.sql.functions import col, expr, avg, sum

filtered_df = df_spark.filter(col('salary') > 50)

df_with_bonus = df_spark.withColumn('bonus', expr("salary * 0.2"))

avg_salary_df = df_spark.groupBy('department').agg(avg('salary').alias('avg_Salary'))

print("Filtered DataFrame (Salary > 50000):")
filtered_df.show()

print("DataFrame with Bonus Column:")
df_with_bonus.show()

print("Average Salary per Department:")
avg_salary_df.show()


Filtered DataFrame (Salary > 50000):
+---+------------+-----------+------+
| id|        name| department|salary|
+---+------------+-----------+------+
|  1|    John Doe|Engineering| 80000|
|  2|  Jane Smith|         HR| 75000|
|  3|Robert Brown|    Finance| 90000|
|  4| Emily Davis|  Marketing| 70000|
|  2|Robert Brown|         HR| 75000|
|  3| Emily Davis|    Finance| 90000|
|  3|Robert Brown|         HR| 70000|
+---+------------+-----------+------+

DataFrame with Bonus Column:
+---+------------+-----------+------+-------+
| id|        name| department|salary|  bonus|
+---+------------+-----------+------+-------+
|  1|    John Doe|Engineering| 80000|16000.0|
|  2|  Jane Smith|         HR| 75000|15000.0|
|  3|Robert Brown|    Finance| 90000|18000.0|
|  4| Emily Davis|  Marketing| 70000|14000.0|
|  2|Robert Brown|         HR| 75000|15000.0|
|  3| Emily Davis|    Finance| 90000|18000.0|
|  3|Robert Brown|         HR| 70000|14000.0|
+---+------------+-----------+------+-------+

Average 

Spark SQL Query: Register the DataFrame as a temporary view using createOrReplaceTempView(). Then, write and execute an SQL query using spark.sql() to achieve the same result as one of the transformations above (e.g., the same aggregation by department). Display the query result.

In [0]:
df_spark.createOrReplaceTempView("employees")

filtered_result = spark.sql("SELECT * from employees where salary > 50000")
print("Filtered Employees (Salary > 50000):")
filtered_result.show()

avg_salary_result = spark.sql("select department, avg(salary) as avg_salary from employees group by department")

print("Average Salary by Department:")
avg_salary_result.show()

Filtered Employees (Salary > 50000):
+---+------------+-----------+------+
| id|        name| department|salary|
+---+------------+-----------+------+
|  1|    John Doe|Engineering| 80000|
|  2|  Jane Smith|         HR| 75000|
|  3|Robert Brown|    Finance| 90000|
|  4| Emily Davis|  Marketing| 70000|
|  2|Robert Brown|         HR| 75000|
|  3| Emily Davis|    Finance| 90000|
|  3|Robert Brown|         HR| 70000|
+---+------------+-----------+------+

Average Salary by Department:
+-----------+-----------------+
| department|       avg_salary|
+-----------+-----------------+
|Engineering|          80000.0|
|         HR|73333.33333333333|
|    Finance|          90000.0|
|  Marketing|          70000.0|
+-----------+-----------------+



RDD from External Data: Using SparkContext, read an external text file into an RDD (for example, a log file or a text dataset). You can use sc.textFile("path/to/file.txt") to create an RDD where each element is a line of the file. If you don’t have an external file, create a list of strings (e.g., sentences) and parallelize it as an RDD.

In [0]:
# Create a list of sentences
text_data = [
    "Apache Spark is a fast and powerful engine for big data processing.",
    "PySpark is the Python API for Apache Spark.",
    "RDD stands for Resilient Distributed Dataset.",
    "DataFrames are optimized compared to RDDs.",
    "Spark SQL allows querying structured data using SQL."
]

# Parallelize the list into an RDD
rdd = spark.sparkContext.parallelize(text_data)

# Show the RDD contents
print("RDD Contents:")
print(rdd.collect())

rdd.getNumPartitions()

RDD Contents:
['Apache Spark is a fast and powerful engine for big data processing.', 'PySpark is the Python API for Apache Spark.', 'RDD stands for Resilient Distributed Dataset.', 'DataFrames are optimized compared to RDDs.', 'Spark SQL allows querying structured data using SQL.']
Out[25]: 8

2. FlatMap and Map: If working with text data, use flatMap to split lines into words (so you get an RDD of individual words). Then use map to transform each word into a key-value pair of the form (word, 1). This prepares the data for counting word occurrences.

In [0]:
words_rdd = rdd.flatMap(lambda x: x.split(" "))

word_pairs_rdd = words_rdd.map(lambda x: (x.lower(), 1))

print("Words RDD:")
print(words_rdd.collect())

print("Word Pairs RDD:")
print(word_pairs_rdd.collect())

Words RDD:
['Apache', 'Spark', 'is', 'a', 'fast', 'and', 'powerful', 'engine', 'for', 'big', 'data', 'processing.', 'PySpark', 'is', 'the', 'Python', 'API', 'for', 'Apache', 'Spark.', 'RDD', 'stands', 'for', 'Resilient', 'Distributed', 'Dataset.', 'DataFrames', 'are', 'optimized', 'compared', 'to', 'RDDs.', 'Spark', 'SQL', 'allows', 'querying', 'structured', 'data', 'using', 'SQL.']
Word Pairs RDD:
[('apache', 1), ('spark', 1), ('is', 1), ('a', 1), ('fast', 1), ('and', 1), ('powerful', 1), ('engine', 1), ('for', 1), ('big', 1), ('data', 1), ('processing.', 1), ('pyspark', 1), ('is', 1), ('the', 1), ('python', 1), ('api', 1), ('for', 1), ('apache', 1), ('spark.', 1), ('rdd', 1), ('stands', 1), ('for', 1), ('resilient', 1), ('distributed', 1), ('dataset.', 1), ('dataframes', 1), ('are', 1), ('optimized', 1), ('compared', 1), ('to', 1), ('rdds.', 1), ('spark', 1), ('sql', 1), ('allows', 1), ('querying', 1), ('structured', 1), ('data', 1), ('using', 1), ('sql.', 1)]


ReduceByKey: Apply the reduceByKey transformation on the key-value RDD to aggregate counts for each unique word (key). This will produce an RDD of (word, total_count) pairs.


In [0]:
# Apply reduceByKey to count word occurrences
word_counts_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b)

#Show the word count results
print("Word Count Results:")
print(word_counts_rdd.collect())

Word Count Results:
[('engine', 1), ('python', 1), ('spark.', 1), ('are', 1), ('optimized', 1), ('using', 1), ('sql.', 1), ('spark', 2), ('a', 1), ('powerful', 1), ('stands', 1), ('allows', 1), ('dataset.', 1), ('apache', 2), ('fast', 1), ('for', 3), ('data', 2), ('processing.', 1), ('resilient', 1), ('distributed', 1), ('rdds.', 1), ('api', 1), ('compared', 1), ('and', 1), ('pyspark', 1), ('the', 1), ('to', 1), ('structured', 1), ('is', 2), ('dataframes', 1), ('querying', 1), ('big', 1), ('rdd', 1), ('sql', 1)]


Collect/Take Action: Use an action like collect() or take(10) to retrieve some of the word counts and print them. If the dataset is large, use take to print only a sample of the results (e.g., top 10 words) instead of the entire collection.

In [0]:
all_word_counts = word_counts_rdd.collect()
print("All Word Counts:")
print(all_word_counts)

top_10_words = word_counts_rdd.take(10)
print("\nTop 10 Word Counts:")
print(top_10_words)

All Word Counts:
[('engine', 1), ('python', 1), ('spark.', 1), ('are', 1), ('optimized', 1), ('using', 1), ('sql.', 1), ('spark', 2), ('a', 1), ('powerful', 1), ('stands', 1), ('allows', 1), ('dataset.', 1), ('apache', 2), ('fast', 1), ('for', 3), ('data', 2), ('processing.', 1), ('resilient', 1), ('distributed', 1), ('rdds.', 1), ('api', 1), ('compared', 1), ('and', 1), ('pyspark', 1), ('the', 1), ('to', 1), ('structured', 1), ('is', 2), ('dataframes', 1), ('querying', 1), ('big', 1), ('rdd', 1), ('sql', 1)]

Top 10 Word Counts:
[('engine', 1), ('python', 1), ('spark.', 1), ('are', 1), ('optimized', 1), ('using', 1), ('sql.', 1), ('spark', 2), ('a', 1), ('powerful', 1)]



1. Repeated Computation Scenario: Using the DataFrame or RDD from a previous exercise (or create a new one), design a scenario where you would perform multiple actions on the same dataset. For example, you might want to compute the count of records and also compute an aggregate (like sum or average on a column) on the same DataFrame/RDD. Write code to perform two or three different actions on the dataset without caching, and measure the execution time or observe the Spark job executions (if using Spark UI or logs).

Expected Improvement with Caching
- Without caching: 
Each action recomputes transformations, leading to slow performance.
- With caching: The DataFrame is stored in memory, and subsequent actions reuse the cached data instead of recomputing.

number of partitition?
collect and take?
reducebykey?

In [0]:

import time

df = df_spark

# Now perform multiple actions
start_time = time.time()
record_count = df.count()  # First action
count_time = time.time()

sum_salary = df.select(sum("salary")).collect()[0][0] # Second action
sum_time = time.time()

avg_salary = df.select(avg("salary")).collect()[0][0]  # Third action
avg_time = time.time()

# Print results and execution times
print(f"Total Record Count: {record_count} (Time taken: {count_time - start_time:.4f} sec)")
print(f"Total Salary Sum: {sum_salary} (Time taken: {sum_time - count_time:.4f} sec)")
print(f"Average Salary: {avg_salary:.2f} (Time taken: {avg_time - sum_time:.4f} sec)")

# Measure total execution time
total_time = avg_time - start_time
print(f"\nTotal Execution Time WITHOUT Caching: {total_time:.4f} sec")


Total Record Count: 7 (Time taken: 0.3319 sec)
Total Salary Sum: 550000 (Time taken: 0.3800 sec)
Average Salary: 78571.43 (Time taken: 0.5928 sec)

Total Execution Time WITH Caching: 1.3047 sec


In [0]:

# Cache the DataFrame to avoid redundant computations
df_spark.cache()

# Now perform multiple actions
start_time = time.time()
record_count = df_spark.count()  # First action
count_time = time.time()

sum_salary = df_spark.select(sum("salary")).collect()[0][0] # Second action
sum_time = time.time()

avg_salary = df_spark.select(avg("salary")).collect()[0][0]  # Third action
avg_time = time.time()

# Print results and execution times
print(f"Total Record Count: {record_count} (Time taken: {count_time - start_time:.4f} sec)")
print(f"Total Salary Sum: {sum_salary} (Time taken: {sum_time - count_time:.4f} sec)")
print(f"Average Salary: {avg_salary:.2f} (Time taken: {avg_time - sum_time:.4f} sec)")

# Measure total execution time
total_time = avg_time - start_time
print(f"\nTotal Execution Time WITH Caching: {total_time:.4f} sec")


Total Record Count: 7 (Time taken: 0.3345 sec)
Total Salary Sum: 550000 (Time taken: 0.4905 sec)
Average Salary: 78571.43 (Time taken: 0.3119 sec)

Total Execution Time WITH Caching: 1.1369 sec


Expected Outcome
Faster execution time with caching

The first action (count()) will take longer since Spark loads and caches the DataFrame.
Subsequent actions (sum(), avg()) will be faster as they reuse the cached data.
Reduced Redundant Computations

Unlike before, Spark won't recompute the entire DataFrame for each action.
Verifying Cache Usage

If using Spark UI, navigate to Storage Tab to confirm that the DataFrame is cached.
Check logs for messages indicating that Spark is using cached data.

 Compare Performance: If possible, compare the execution times or the Spark job DAG for the actions with and without caching. You should notice that with caching, the dataset is computed only once and reused for subsequent actions, whereas without caching Spark recomputes the dataset for each action.

In [0]:
from pyspark.sql.functions import avg, sum
from pyspark import StorageLevel

# Step 1: Cache the DataFrame to avoid redundant computations
df_spark.cache()  # or use 

# df_spark.persist(StorageLevel.MEMORY_AND_DISK)

# Step 2: Perform multiple actions WITH caching and measure execution time
start_time = time.time()
record_count_cached = df_spark.count()  # First action: Count records
count_time_cached = time.time()

sum_salary_cached = df_spark.select(sum("salary")).collect()[0][0]  # Second action: Compute sum
sum_time_cached = time.time()

avg_salary_cached = df_spark.select(avg("salary")).collect()[0][0]  # Third action: Compute average
avg_time_cached = time.time()

# Step 3: Print results and execution times
execution_times_cached = {
    "Total Record Count": (record_count_cached, count_time_cached - start_time),
    "Total Salary Sum": (sum_salary_cached, sum_time_cached - count_time_cached),
    "Average Salary": (avg_salary_cached, avg_time_cached - sum_time_cached),
    "Total Execution Time WITH Caching": avg_time_cached - start_time
}

df_execution_times_cached = pd.DataFrame(execution_times_cached, index=["Value", "Time Taken (sec)"]).T
df_execution_times_cached
# # Display execution times in a tabular format
# tools.display_dataframe_to_user(name="Execution Times With Caching", dataframe=df_execution_times_cached)


Unnamed: 0,Value,Time Taken (sec)
Total Record Count,7.0,0.164051
Total Salary Sum,550000.0,0.247052
Average Salary,78571.428571,0.215072
Total Execution Time WITH Caching,0.626175,0.626175


1. When is Caching Beneficial?
Caching in Spark improves performance when: 
- ✅ Multiple actions are performed on the same dataset → Without caching, Spark recomputes transformations every time an action is triggered.
- ✅ The dataset is expensive to compute → If transformations involve costly operations (e.g., joins, aggregations), caching saves recomputation time.
- ✅ The dataset fits in memory → If data can be stored in RAM (MEMORY_ONLY mode), subsequent operations are much faster.
- ✅ Iterative processing is required → Common in machine learning and graph algorithms, where the same data is used repeatedly.

2. When Should You NOT Cache a Dataset?
- ❌ When the dataset is used only once → If an RDD or DataFrame is used in a single action, caching adds unnecessary overhead.
- ❌ When memory is limited → Caching large datasets in memory can cause OutOfMemory (OOM) errors, leading to job failures.
- ❌ For small datasets → If the dataset is small, Spark can recompute it quickly, making caching unnecessary.
- ❌ If transformations are cheap → Simple transformations (e.g., select() on a small dataset) don’t benefit significantly from caching.

3. Importance of Unpersisting Cached Data (unpersist())
- Cached data remains in memory until manually removed.
- unpersist() frees up memory, preventing OOM errors when the dataset is no longer needed.

Final Takeaways
- ✔ Caching reduces redundant computation, improving performance for repeated actions.
- ✔ Choosing the right persistence level (MEMORY_ONLY, MEMORY_AND_DISK) ensures efficient memory management.
- ✔ Unpersisting cached data (unpersist()) is essential to avoid memory issues in long-running Spark jobs.

## Exercise 5: Spark Application Architecture (Conceptual Review)

## 1. Spark Architecture Components
> A. Driver Program
✅ What is it?

The entry point of a Spark application.
Runs on the Spark master node and creates the SparkSession.
Converts user-defined operations (e.g., transformations, actions) into jobs.
✅ Responsibilities:

- Maintains SparkContext (interface between application and cluster).
- Splits a job into stages based on transformations.
- Schedules tasks and coordinates execution with executors.
B. Executors
> ✅ What are Executors?

- Worker nodes that run tasks in parallel.
- Each executor is assigned a fraction of data and processes it independently.
**✅ Responsibilities:**

- Executes tasks on assigned data partitions.
- Stores intermediate results in memory (caching) if needed.
- Reports task status and results back to the driver.
### 2. Spark Execution Model
**A. Jobs**

✅ Definition:

- A job is triggered whenever an action is called (e.g., .collect(), .count()).
- The Spark driver splits the job into multiple stages.
✅ Example:


`df.count()  # Triggers a job`
- This job counts rows in the DataFrame.
- Spark converts this into tasks and sends them to executors.

**B. Stages**

✅ Definition:

- A stage is a logical unit of computation determined by transformations.
- Wide transformations (e.g., groupByKey(), reduceByKey()) trigger a shuffle, creating multiple stages.
✅ Example of a Multi-Stage Execution:

`rdd = sc.textFile("data.txt")  # Stage 1

word_counts = rdd.flatMap(lambda line: line.split(" "))  # Stage 1 (narrow transformation)

word_pairs = word_counts.map(lambda word: (word, 1))  # Stage 1 (narrow transformation)

word_freq = word_pairs.reduceByKey(lambda a, b: a + b)  # Stage 2 (wide transformation)

word_freq.collect()  # Stage 2 (Action triggers execution)`

- Stage 1: Reads data, splits into words, and maps them into key-value pairs.
- Stage 2: Triggers a shuffle due to reduceByKey(), creating a new stage.
**C. Tasks**
✅ Definition:

- A task is the smallest unit of execution.
- Each stage is split into multiple tasks, each assigned to a partition.
✅ Execution Flow:

- The driver creates a job and breaks it into stages.
- Each stage consists of multiple tasks.
- Tasks are distributed to executors, where they run in parallel.
- The driver collects results and returns them.
### 3. How Spark’s Execution Model Works
✅ Step-by-Step Breakdown

- User Submits a Spark Job
- The driver program starts execution.
- SparkContext Creates an RDD or DataFrame
- Lazy transformations (e.g., map(), filter()) do not trigger execution.
- Spark Creates a DAG (Directed Acyclic Graph)
- Splits execution into stages.
- Stages Are Divided into Tasks
- Spark assigns tasks to executors based on data locality.
- Executors Process Tasks and Return Results
- Executors store results in memory or disk if necessary.
- Final Result Is Sent to the Driver
- The action (e.g., collect(), count()) returns the computed result.

## 4. DAG (Directed Acyclic Graph) in Spark

✅ What is a DAG?

- A DAG represents the logical execution plan of Spark operations.
- Nodes represent RDD/DataFrame transformations.
- Edges represent dependencies between transformations.

✅ Key Benefits of DAG Execution:

- Optimized execution → Spark reorders operations for efficiency.
- Fault Tolerance → If a task fails, Spark can recompute only the failed partitions.
- Parallelism → Tasks run in parallel across executors.
