<center>
    <img src="https://rockborne.com/wp-content/uploads/2021/07/LandingPage-Header-RED-CENTRE.jpg" width="900" alt="logo"  />
</center>

---
# Introduction to PySpark
---


# Prerequisites

- Have the following data downloaded:
  - `people.json`
  - `appl-stock.csv`
  - `Production.Product.csv`
  - `superstore.csv`

<a name="anchorWhatIsSpark" style="position:absolute;"></a>
<hr style="border:2px solid">

# 1. Introduction
<hr style="border-top:1px dashed">

This notebook will guide you through the basics of using PySpark for data analysis.  

We will work with a dataset containing student marks across different modules.  

You'll learn how to:  
- Load and explore data using PySpark  
- Perform basic transformations  

Throughout the notebook, you’ll find **tasks** to complete on your own to reinforce your understanding.


---

## **What is Apache Spark?**
Apache Spark is a powerful big data processing engine focused on:
- **Speed**
- **Ease of Use**
- **Modularity**
- **Extensibility**

Spark provides in-memory storage for intermediate computations, making it much faster than Hadoop MapReduce. The Spark engine includes libraries accessible via APIs in Python, R, and Scala. It supports multiple deployment environments and integrates with various data sources.

### **Apache Spark Architecture**
Spark uses a **Directed Acyclic Graph (DAG)** for computation, which allows it to efficiently process tasks in parallel.

### **Apache Spark Ecosystem**
![Spark Ecosystem](https://www.oreilly.com/api/v2/epubs/9781787283985/files/assets/2581d347-d9db-4b1e-96f5-914e3f5e77fb.png)

*Reference: [O'Reilly](https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch01.html)*


---

## **What is PySpark?**

PySpark is the Python API for Apache Spark, allowing users to interact with Spark using Python syntax. It provides:
- Distributed computing capabilities
- Data processing at scale
- Seamless integration with Python libraries

PySpark operates by interfacing with the **Java Virtual Machine (JVM)** using the `Py4J` package, meaning Java must be installed.

### **PySpark Architecture**
- **Driver Node**: The entry point to the application
- **Worker Nodes**: Execute tasks in parallel
- **Cluster Manager**: Handles resource allocation

<center>
    <img src="https://spark.apache.org/docs/latest/img/cluster-overview.png" width="500" alt="PySpark Arch"  />
</center>

Reference: https://spark.apache.org/docs/latest/cluster-overview.html


### **Spark Workloads**
Spark is well-suited for:
- **ETL (Extract, Transform, Load)** (Batch & Streaming)
- **Interactive Queries** (Spark SQL)
- **Machine Learning** (MLlib)
- **Graph Processing** (GraphX/GraphFrames)

<center>
    <img src="https://www.researchgate.net/profile/Josh-Choi-2/publication/303098621/figure/fig5/AS:566114846298112@1511983705187/The-Apache-Spark-TM-stack-of-engine-bottom-and-libraries-top-Image-credit.png" width="500" alt="Spark Core"  />
</center>
<br>


[Reference](https://www.researchgate.net/publication/272825265_Big_Data_Analysis_Apache_Spark_Perspective)


### **Concepts and Key Terms**


<strong>Spark Cluster</strong>
<p>A collection of machines or nodes in the cloud or on-premise in a data center on which Spark is installed. Among those machines are Spark workers, a Spark Master (also a cluster manager in a Standalone mode), and at least one Spark Driver.</p>

<strong>Spark Master</strong>
<p>As the name suggests, Spark master JVM acts as a cluster manager in a Standalone deployment mode to which Spark workers register themselves as part of a quorum. Depending on the deployment  mode, it acts as a resource manager and decides where and how many Executors to launch, and on what Spark workers in the cluster.</p>

<strong>Spark Worker</strong>
<p>The Spark worker JVM, upon receiving instructions from Spark master, launches executors on the worker on behalf of the Spark driver. Spark applications, decomposed into units of tasks, are executed on each worker’s Executor. In short, the worker’s job is to only launch an Executor on behalf of the master.</p>

<strong>Spark Executor</strong>
<p>It’s a JVM container with an allocated amount of cores and memory on which Spark runs its tasks. Each worker node launches its own Spark Executor, with a configurable number of cores (or threads). Besides executing Spark tasks, an Executor also stores and caches all data partitions in memory.</p>

<strong>Spark Driver</strong>
<p>Once it gets information from the Spark master of all the workers in the cluster and where they are, the driver program distributes Spark tasks to each worker’s Executor. The driver also receives computed results from each Executor’s tasks.</p>

![Spark Cluster](https://datacadamia.com/_media/db/spark/cluster/spark_cluster_tasks_slot.png)
*Reference: DataCadamia*


### **Why PySpark?**
If you're familiar with Pandas, you might wonder why we need PySpark. Here are some key differences:

| Feature        | Pandas (Single Machine) | PySpark (Distributed) |
|---------------|------------------------|------------------------|
| Data Size     | Limited by RAM          | Scales across clusters |
| Performance   | Fast for small data     | Optimized for big data |
| Parallelism   | Single-threaded         | Multi-threaded, parallel |
| Storage       | Local memory/disk       | Distributed storage (HDFS, S3, etc.) |

### **Example: Pandas vs. PySpark DataFrame Operations**

#### **Pandas Example**
```python
import pandas as pd

df = pd.read_csv("data.csv")
df.head()
```

#### **PySpark Example**
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.show()
```

### **SQL to PySpark Translation**
If you're comfortable with SQL, PySpark DataFrames work similarly. Here’s how common SQL queries translate to PySpark:

#### **SQL Query**
```sql
-- SQL
SELECT name, age FROM people WHERE age > 30;
```

#### **PySpark Equivalent**
```python
df.select("name", "age").filter(df.age > 30).show()
```


---

This notebook provides a fundamental understanding of PySpark. In the next sections, we'll explore:
1. Setting up PySpark in Google Colab
2. Loading and exploring data
3. Transformations and actions in PySpark
4. Aggregations and SQL queries in PySpark

Let's get started!

<a name="anchorBasics" style="position:absolute;"></a>
<hr style="border:2px solid">

# 2. PySpark Basics
<hr style="border-top:1px dashed">

In [1]:
# Start a SparkSession
import findspark
findspark.init()
findspark.find()

'C:\\spark-3.5.7-bin-hadoop3'

In [2]:
# Import necessary libraries
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('TheBasics').getOrCreate() ##Create the session in the Spark server
spark

---
<a name="anchorRDD" style="position:absolute;"></a>
## 2.2 RDDs
---

A feature in PySpark that you may not be used to working with in Python is a **Resilient Distributed Dataset** (RDD). RDDs are a fundamental data structure of PySpark that is **fault-tolerant and immutable** (meaning it cannot be changed). Each dataset in an RDD is divided into logical partitions which can be divided onto separate worker nodes for **faster computation**. RDDs are mostly suited towards **unstructured data** such as media streams or streams of text. They are also not bound by a columnar schema (like a dataframe in pandas is).

<center>
    <img src="https://miro.medium.com/max/720/1*l2MUHFvWfcdiUbh7Y-fM5Q.png" width="500" alt="PySpark Intro"  />
</center>

### **Creating an RDD**
There are several ways to create an RDD. Two such ways are the parallelize() function which will convert a list to an RDD and the textFile() function which will take a text file as the input and convert it into an RDD

### **1. Using `parallelize()`**
The `parallelize()` function takes an existing Python list and distributes it across the Spark cluster to form an RDD.

#### **Example:**
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RDD Example").getOrCreate()
sc = spark.sparkContext  # SparkContext is required for working with RDDs

# Creating an RDD from a list
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())  # Output: [1, 2, 3, 4, 5]
```

### **2. Using `textFile()`**
The `textFile()` function reads a text file and converts it into an RDD, where each line of the file becomes an element in the RDD.

#### **Example:**
```python
# Creating an RDD from a text file
rdd_text = sc.textFile("sample.txt")
print(rdd_text.collect())  # Output: ['line1', 'line2', 'line3', ...]
```




Below, we will convert a standard Python list into a distributed RDD using PySpark's parallelize function. Once the data is in an RDD, we will apply transformations and actions to demonstrate PySpark's parallel processing capabilities.

In [4]:
data = [
    (1, "Alice", "Math", 85),
    (2, "Bob", "Math", 78),
    (3, "Charlie", "Math", 92),
    (4, "Alice", "Science", 88),
    (5, "Bob", "Science", 74),
    (6, "Charlie", "Science", 95),
    (7, "Alice", "History", 90),
    (8, "Bob", "History", 80),
    (9, "Charlie", "History", 85)
]

[(1, 'Alice', 'Math', 85),
 (2, 'Bob', 'Math', 78),
 (3, 'Charlie', 'Math', 92),
 (4, 'Alice', 'Science', 88),
 (5, 'Bob', 'Science', 74),
 (6, 'Charlie', 'Science', 95),
 (7, 'Alice', 'History', 90),
 (8, 'Bob', 'History', 80),
 (9, 'Charlie', 'History', 85)]

### **Transformations and Actions on RDDs**
RDDs support two types of operations:
- **Transformations**: These return a new RDD (lazy execution).
- **Actions**: These return a value to the driver.



##### Transformations

Transformations return a new RDD without modifying the original.

- **`map(func)`** – Applies `func` to each element and returns a new RDD.
- **`flatMap(func)`** – Similar to `map`, but flattens the results.
- **`filter(func)`** – Returns an RDD with elements that satisfy `func`.
- **`distinct()`** – Removes duplicate elements.
- **`sample(withReplacement, fraction)`** – Returns a sampled subset.
- **`union(otherRDD)`** – Merges two RDDs.
- **`intersection(otherRDD)`** – Returns common elements between RDDs.
- **`subtract(otherRDD)`** – Returns elements in this RDD but not in `otherRDD`.
- **`cartesian(otherRDD)`** – Returns the Cartesian product.
- **`groupByKey()`** – Groups values with the same key (for `(K, V)` pairs).
- **`reduceByKey(func)`** – Merges values with the same key using `func`.
- **`sortByKey()`** – Sorts by key in an RDD of key-value pairs.

#### **Lambda Functions in Python**
Below we will also be using lambda functions. A **lambda function** is an **anonymous, single-expression function** defined using the `lambda` keyword instead of `def`. It is useful for short, simple functions.

##### **Syntax:**
```python
lambda arguments: expression
```

##### **Example 1: Simple Lambda Function**
```python
square = lambda x: x ** 2
print(square(5))  # Output: 25
```

##### **Example 2: Using Lambda with `map`**
```python
numbers = [1, 2, 3, 4]
squared_numbers = list(map(lambda x: x ** 2, numbers))
print(squared_numbers)  # Output: [1, 4, 9, 16]
```

##### **Example 3: Using Lambda with `filter`**
```python
numbers = [1, 2, 3, 4, 5, 6]
even_numbers = list(filter(lambda x: x % 2 == 0, numbers))
print(even_numbers)  # Output: [2, 4, 6]
```


##### **When to Use Lambda Functions?**
- When you need a small function for a short period  
- When using functions like `map()`, `filter()`, `sorted()`, and `reduce()`  
- When defining a function inline for readability  



In [None]:
# Extract (Student, Score) pairs


[('Alice', 85),
 ('Bob', 78),
 ('Charlie', 92),
 ('Alice', 88),
 ('Bob', 74),
 ('Charlie', 95),
 ('Alice', 90),
 ('Bob', 80),
 ('Charlie', 85)]

By using the `student_scores_rdd` formed above we can use the students' names as a key and use the `reduceByKey()` function to sum the scores of each student.

In [None]:
# Sum scores per student


[('Charlie', 272), ('Bob', 232), ('Alice', 263)]

In [None]:
# Filter students with total scores above 250


[('Charlie', 272), ('Alice', 263)]

In [None]:
# Sort by total score in descending order


[('Charlie', 272), ('Alice', 263)]

[(1, 'Alice', 'Math', 85),
 (2, 'Bob', 'Math', 78),
 (3, 'Charlie', 'Math', 92),
 (4, 'Alice', 'Science', 88),
 (5, 'Bob', 'Science', 74),
 (6, 'Charlie', 'Science', 95),
 (7, 'Alice', 'History', 90),
 (8, 'Bob', 'History', 80),
 (9, 'Charlie', 'History', 85)]

#### Actions

Actions return a value after executing computations on the RDD.

- **`collect()`** – Returns all elements of the RDD.
- **`count()`** – Returns the number of elements.
- **`first()`** – Returns the first element.
- **`take(n)`** – Returns the first `n` elements.
- **`top(n)`** – Returns the top `n` elements.
- **`reduce(func)`** – Aggregates elements using `func`.
- **`countByKey()`** – Counts occurrences of each key.
- **`foreach(func)`** – Applies `func` to each element (without returning a new RDD).

Student Scores: [('Alice', 85), ('Bob', 78), ('Charlie', 92), ('Alice', 88), ('Bob', 74), ('Charlie', 95), ('Alice', 90), ('Bob', 80), ('Charlie', 85)]
Aggregated Scores: [('Charlie', 272), ('Bob', 232), ('Alice', 263)]
High Achievers: [('Charlie', 272), ('Alice', 263)]
Sorted High Achievers: [('Charlie', 272), ('Alice', 263)]
Total Number of Students: 3
Top Scorer: ('Charlie', 272)


---
<a name="anchorDF" style="position:absolute;"></a>
## 2.3 DataFrames
---

Like an RDD, a DataFrame is an immutable collection of data; however, unlike an RDD, data is organised by columns, like a table in a relational database. If you are familiar with dataframes in pandas, then you should have a good idea of the structure here. The difference between a pandas dataframe and a PySpark dataframe is that in Spark, dataframes are distributed in the cluster. Due to parallel execution on all cores on multiple machines, PySpark runs operations much faster than pandas. By allowing developers to impose a structure (& schema) on a collection of data, higher-level abstraction is also allowed.

### **Creating a DataFrame**

A dataframe can be created in several ways including: <br>
- from an existing RDD
- from a list
- from an external data source

**From an existing RDD:**

[Row(_1=1, _2='Alice', _3='Math', _4=85),
 Row(_1=2, _2='Bob', _3='Math', _4=78),
 Row(_1=3, _2='Charlie', _3='Math', _4=92),
 Row(_1=4, _2='Alice', _3='Science', _4=88),
 Row(_1=5, _2='Bob', _3='Science', _4=74),
 Row(_1=6, _2='Charlie', _3='Science', _4=95),
 Row(_1=7, _2='Alice', _3='History', _4=90),
 Row(_1=8, _2='Bob', _3='History', _4=80),
 Row(_1=9, _2='Charlie', _3='History', _4=85)]

**From a list:**

In [None]:
data_list = [('John','','Doe','1998-04-08','M',2000),
  ('Michael','Williams','','2002-05-24','M',3000),
  ('James','','Jones','1981-09-09','M',4000),
  ('Maria','Anne','Jones','1973-12-08','F',4000),
  ('Mary','Beth','Doe','1985-02-13','F',-1)]


([('John', '', 'Doe', '1998-04-08', 'M', 2000),
  ('Michael', 'Williams', '', '2002-05-24', 'M', 3000),
  ('James', '', 'Jones', '1981-09-09', 'M', 4000),
  ('Maria', 'Anne', 'Jones', '1973-12-08', 'F', 4000),
  ('Mary', 'Beth', 'Doe', '1985-02-13', 'F', -1)],
 ['firstname', 'middlename', 'lastname', 'dob', 'gender', 'salary'])

DataFrame[firstname: string, middlename: string, lastname: string, dob: string, gender: string, salary: bigint]

[Row(firstname='John', middlename='', lastname='Doe', dob='1998-04-08', gender='M', salary=2000),
 Row(firstname='Michael', middlename='Williams', lastname='', dob='2002-05-24', gender='M', salary=3000),
 Row(firstname='James', middlename='', lastname='Jones', dob='1981-09-09', gender='M', salary=4000),
 Row(firstname='Maria', middlename='Anne', lastname='Jones', dob='1973-12-08', gender='F', salary=4000),
 Row(firstname='Mary', middlename='Beth', lastname='Doe', dob='1985-02-13', gender='F', salary=-1)]

**Reading External Data:**

**Double check that the `people.json` file is uploaded into the Files area** - upload again now if it is not (refer to the start of the notebook if you need a refresh on how to upload).

In [None]:
##External data:
df_from_json = spark.read.json('../DataSources/people.json')


[Row(age=None, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

We will cover more ways to read in external data later

### **Basic Functions**

**Showing the data:**

+---+-------+-------+---+
| _1|     _2|     _3| _4|
+---+-------+-------+---+
|  1|  Alice|   Math| 85|
|  2|    Bob|   Math| 78|
|  3|Charlie|   Math| 92|
|  4|  Alice|Science| 88|
|  5|    Bob|Science| 74|
|  6|Charlie|Science| 95|
|  7|  Alice|History| 90|
|  8|    Bob|History| 80|
|  9|Charlie|History| 85|
+---+-------+-------+---+



+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|     John|          |     Doe|1998-04-08|     M|  2000|
|  Michael|  Williams|        |2002-05-24|     M|  3000|
|    James|          |   Jones|1981-09-09|     M|  4000|
|    Maria|      Anne|   Jones|1973-12-08|     F|  4000|
|     Mary|      Beth|     Doe|1985-02-13|     F|    -1|
+---------+----------+--------+----------+------+------+



**Looking at data architecture:**

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



<div class="alert alert-block alert-info">
    <b>Note:</b> printSchema() is similar to .info() in pandas.
</div>

**Getting the column names with `.columns`:**

['_1', '_2', '_3', '_4']

['firstname', 'middlename', 'lastname', 'dob', 'gender', 'salary']

['age', 'name']

**Get a brief description of data with `.describe()`:**

DataFrame[summary: string, age: string, name: string]

DataFrame[summary: string, firstname: string, middlename: string, lastname: string, dob: string, gender: string, salary: string]

Notice how in our `df_from_json` dataframe age is of the type string. Some data types make it easier to infer schema (like tabular formats such as csv which we will show later). <br> However, you often have to set the schema yourself if you aren't dealing with a .read method that doesn't have inferSchema() built-in. This can be handled easily as spark has all the tools you need for this, it just requires a very specific structure:

Reference: https://spark.apache.org/docs/latest/sql-ref-datatypes.html

**StructField:**

In [33]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

When defining a schema for a PySpark DataFrame, we use `StructType` and `StructField` to specify the structure of the data. Each column in the DataFrame is represented by a `StructField`.

Each `StructField` takes the following parameters:
- `name`: string, name of the field.
- `dataType`: class, DataType of the field.
- `nullable`: boolean, whether the field can be null (None) or not.

In [34]:
json_data_schema = [StructField("age", IntegerType(), True),
                    StructField("name", StringType(), True)]
json_data_schema ##List with the values we need

[StructField('age', IntegerType(), True),
 StructField('name', StringType(), True)]

In [35]:
final_struc = StructType(fields=json_data_schema)
final_struc

StructType([StructField('age', IntegerType(), True), StructField('name', StringType(), True)])

Now, we can read in the json file with the defined schema

In [38]:
# Read the people.json file with the new schema
df_from_json_schemed = spark.read.json('../DataSources/people.json', schema=final_struc)

In [39]:
df_from_json_schemed

DataFrame[age: int, name: string]

**Creating a copy of `df_from_json_schemed` as `df`:**

DataFrame[age: int, name: string]

**Ensuring that the copy is independent and not an alias:**

(1829892560144, 1829892871296, False)

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [None]:
#grabbing a column object


Column<'age'>

In [None]:
#Validating the object type`


pyspark.sql.column.Column

DataFrame[age: int]

**NOTE**: Notice how using select returns a dataframe

pyspark.sql.dataframe.DataFrame

 __show() vs collect() vs take()__

Key Differences:
- `show()`: For quick viewing (prints data).
- `collect()`: Retrieves the entire dataset (be careful with large datasets).
- `take()`: Retrieves a specific number of rows as a list.

In [None]:
##will display the dataframe in tabular form

+----+
| age|
+----+
|NULL|
|  30|
|  19|
+----+



In [None]:
 ##Will display content and metadata

[Row(age=None), Row(age=30), Row(age=19)]

In [None]:
 ##will display a the content and metadata of a limited number of rows

[Row(age=None), Row(age=30)]

Return a list of Row objects with `head()` and `tail()`

In [None]:
# List of Row objects from the top of the DataFrame


[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [None]:
# List of Row objects from the bottom of the DataFrame


[Row(age=30, name='Andy'), Row(age=19, name='Justin')]

**Selecting Multiple Columns:**

Reference: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.select.html

DataFrame[age: int, name: string]

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



**Creating New Columns**

Reference: https://spark.apache.org/docs/3.1.1/api/python/reference/pyspark.sql.html

In [None]:
# Simple Rename


+-----------+-------+
|supernewage|   name|
+-----------+-------+
|       NULL|Michael|
|         30|   Andy|
|         19| Justin|
+-----------+-------+



+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



**Note:** These changes aren't permanent unless you assign it.

In [None]:
# Adding a new column with a simple copy


+----+-------+------+
| age|   name|newage|
+----+-------+------+
|NULL|Michael|  NULL|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+



In [None]:
#New column with some transformation: double the age


+----+-------+---------+
| age|   name|doubleage|
+----+-------+---------+
|NULL|Michael|     NULL|
|  30|   Andy|       60|
|  19| Justin|       38|
+----+-------+---------+



In [58]:
#***Task***
# Add a new column with some transformation: age + 1

#-----------------------------------------------------------------#


#-----------------------------------------------------------------#

In [59]:
#***Task***
# Add a new column with some transformation: age/2

#---------------------------------------------------------------#


#---------------------------------------------------------------#

+----+-------+--------+
| age|   name|half_age|
+----+-------+--------+
|NULL|Michael|    NULL|
|  30|   Andy|    15.0|
|  19| Justin|     9.5|
+----+-------+--------+



(None, 1829892559520, 1829892560144)

<a name="anchorConclusion" style="position:absolute;"></a>
<hr style="border:0.5px solid", color= "gray">

# 4. Conclusion
<hr style="border-top:1px dashed">

- PySpark is a powerful language for Data Science Analysts to learn, because it enables scalabe data wrangling and analysis beyond the limitations of pandas.
- If you already know Python, Pandas and SQL this should be a great jump into big data.
- If you are interested in Data Engineering, [Azure Databricks](https://docs.microsoft.com/en-us/azure/databricks/languages/python) uses PySpark as one of the primary languages to interact with data.
- For more information read the [PySpark Documentation](https://spark.apache.org/docs/latest/api/python/getting_started/index.html).

## Citations
<a name="KDNuggets" style="position:absolute;"></a>
7 Steps to Mastering Apache 2.0 :cite:p:[2016:KDNuggets](https://www.kdnuggets.com/2016/09/7-steps-mastering-apache-spark.html)
<br>