![spark logo](https://datascientest.com/en/files/2021/02/illu_pyspark_blog-84.png)

# <b><center>PySpark Tutorial - Part 1: Pyspark Dataframes</center></b>

*PySpark Tutorial: PySpark is a powerful open-source framework built on Apache Spark, designed to simplify and accelerate large-scale data processing and analytics tasks. It offers a high-level API for Python programming language, enabling seamless integration with existing Python ecosystems.*

*The materials used to create this notebook are as followed:*
* [freeCodeCamp - Pyspark Turtorial](https://www.youtube.com/watch?v=_C8kWso4ne4)
* [PySpark 3.5 Tutorial For Beginners with Examples](https://sparkbyexamples.com/pyspark-tutorial/)

## PySpark Architecture

*PySpark architecture consists of a driver program that coordinates tasks and interacts with a cluster manager to allocate resources. The driver communicates with worker nodes, where tasks are executed within an executor’s JVM. SparkContext manages the execution environment, while the DataFrame API enables high-level abstraction for data manipulation. SparkSession provides a unified entry point for Spark functionality. Underneath, the cluster manager oversees resource allocation and task scheduling across nodes, facilitating parallel computation for processing large-scale data efficiently.*

![spark architecture](https://i0.wp.com/sparkbyexamples.com/wp-content/uploads/2020/02/spark-cluster-overview.png?w=596&ssl=1)

### what we will cover?
* Pyspark Dataframe
* Reading Datasets
* Check Schemas Datatypes
* Selecting & Indexing Columns
* Pyspark Describe Alternative
* Adding Columns
* Dropping Columns
* Renaming Columns
* Dropping Rows
* Various Parameters In Dropping Functionalities
* Handling and Refill Missing Values
  

## Part 1: Pysprak Dataframes  - Reading & Schema Datatype 

Install Notebook Dependencies

In [1]:
!pip install pyspark python-dotenv pandas



## Import libaries

In [2]:
import pyspark
from pyspark.sql import SparkSession

import dotenv

dotenv.load_dotenv()


True

First, let's import the csv as a traditional Pandas Dataframe

In [3]:
# spark = SparkSession.builder.appName('Practice').getOrCreate()
spark = SparkSession.builder \
    .config('spark.sql.debug.maxToStringFields', 2000) \
    .getOrCreate()

24/10/26 10:55:04 WARN Utils: Your hostname, Roys-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.11 instead (on interface en0)
24/10/26 10:55:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/26 10:55:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/26 10:55:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
spark

In [5]:
# read the dataset
df_pyspark = spark.read.csv('data/gym_members_exercise_tracking.csv')
df_pyspark = df_pyspark.limit(3)

In [6]:
# set the first rows as headers
df_pyspark = spark.read.option('header', 'true').csv(
    'data/gym_members_exercise_tracking.csv')

In [7]:
# dataframe dtype
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [8]:
# schema datatypes
df_pyspark.printSchema()

root
 |-- Age: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Weight (kg): string (nullable = true)
 |-- Height (m): string (nullable = true)
 |-- Max_BPM: string (nullable = true)
 |-- Avg_BPM: string (nullable = true)
 |-- Resting_BPM: string (nullable = true)
 |-- Session_Duration (hours): string (nullable = true)
 |-- Calories_Burned: string (nullable = true)
 |-- Workout_Type: string (nullable = true)
 |-- Fat_Percentage: string (nullable = true)
 |-- Water_Intake (liters): string (nullable = true)
 |-- Workout_Frequency (days/week): string (nullable = true)
 |-- Experience_Level: string (nullable = true)
 |-- BMI: string (nullable = true)



### Why did all columns been set to string?
that's because if not explicity set, Pyspark set all values to string values. we'll have to aet inferSchema Parameter to True

In [9]:
df_pyspark = spark.read.option('header', 'true').csv(
    'data/gym_members_exercise_tracking.csv', inferSchema=True)

df_pyspark.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Weight (kg): double (nullable = true)
 |-- Height (m): double (nullable = true)
 |-- Max_BPM: integer (nullable = true)
 |-- Avg_BPM: integer (nullable = true)
 |-- Resting_BPM: integer (nullable = true)
 |-- Session_Duration (hours): double (nullable = true)
 |-- Calories_Burned: double (nullable = true)
 |-- Workout_Type: string (nullable = true)
 |-- Fat_Percentage: double (nullable = true)
 |-- Water_Intake (liters): double (nullable = true)
 |-- Workout_Frequency (days/week): integer (nullable = true)
 |-- Experience_Level: integer (nullable = true)
 |-- BMI: double (nullable = true)



## Part 2: Selecting & Indexing Columns

select a single column

In [10]:
df_pyspark.select('Gender').show(3)

+------+
|Gender|
+------+
|  Male|
|Female|
|Female|
+------+
only showing top 3 rows



select multiply columns

In [11]:
df_pyspark.select(['Gender','Age']).show(3)

+------+---+
|Gender|Age|
+------+---+
|  Male| 56|
|Female| 46|
|Female| 32|
+------+---+
only showing top 3 rows



## Part 3: Describe Dataframe

In [30]:
df_pyspark.describe().show(2)

+-------+-----------------+------+-----------------+------------------+------------------+------------------+------------------+------------------------+-----------------+------------+------------------+---------------------+-----------------------------+------------------+------------------+
|summary|              Age|Gender|      Weight (kg)|        Height (m)|           Max_BPM|           Avg_BPM|       Resting_BPM|Session_Duration (hours)|  Calories_Burned|Workout_Type|    Fat_Percentage|Water_Intake (liters)|Workout_Frequency (days/week)|  Experience_Level|               BMI|
+-------+-----------------+------+-----------------+------------------+------------------+------------------+------------------+------------------------+-----------------+------------+------------------+---------------------+-----------------------------+------------------+------------------+
|  count|              973|   973|              973|               973|               973|               973|         

## Part 4: Adding & Removing & Renaming Columns


In [13]:
## Adding a binary column for is gender is a male 
df_pyspark = df_pyspark.withColumn('Is_Male',df_pyspark['Gender']=='Male')

In [29]:
df_pyspark.show(2)

+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+----+
|Age|Gender|Weight (kg)|Height (m)|Max_BPM|Avg_BPM|Resting_BPM|Session_Duration (hours)|Calories_Burned|Workout_Type|Fat_Percentage|Water_Intake (liters)|Workout_Frequency (days/week)|Experience_Level| BMI|
+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+----+
| 56|  Male|       88.3|      1.71|    180|    157|         60|                    1.69|         1313.0|        Yoga|          12.6|                  3.5|                            4|               3|30.2|
| 46|Female|       74.9|      1.53|    179|    151|         66|                     1.3|          883.0|        HIIT|          33.9|                  2.1|                  

In [15]:
# Drop columns
df_pyspark = df_pyspark.drop('Is_Male')

In [28]:
df_pyspark.show(2)

+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+----+
|Age|Gender|Weight (kg)|Height (m)|Max_BPM|Avg_BPM|Resting_BPM|Session_Duration (hours)|Calories_Burned|Workout_Type|Fat_Percentage|Water_Intake (liters)|Workout_Frequency (days/week)|Experience_Level| BMI|
+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+----+
| 56|  Male|       88.3|      1.71|    180|    157|         60|                    1.69|         1313.0|        Yoga|          12.6|                  3.5|                            4|               3|30.2|
| 46|Female|       74.9|      1.53|    179|    151|         66|                     1.3|          883.0|        HIIT|          33.9|                  2.1|                  

In [27]:
## Rename  - we will rename the Age column
df_pyspark.withColumnRenamed('Age', 'New_Age').show(2)

+-------+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+----+
|New_Age|Gender|Weight (kg)|Height (m)|Max_BPM|Avg_BPM|Resting_BPM|Session_Duration (hours)|Calories_Burned|Workout_Type|Fat_Percentage|Water_Intake (liters)|Workout_Frequency (days/week)|Experience_Level| BMI|
+-------+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+----+
|     56|  Male|       88.3|      1.71|    180|    157|         60|                    1.69|         1313.0|        Yoga|          12.6|                  3.5|                            4|               3|30.2|
|     46|Female|       74.9|      1.53|    179|    151|         66|                     1.3|          883.0|        HIIT|          33.9|                  2.

## Part 5: Dropping Null Rows


In [26]:
df_pyspark.na.drop().show(3)

+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+-----+
|Age|Gender|Weight (kg)|Height (m)|Max_BPM|Avg_BPM|Resting_BPM|Session_Duration (hours)|Calories_Burned|Workout_Type|Fat_Percentage|Water_Intake (liters)|Workout_Frequency (days/week)|Experience_Level|  BMI|
+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+-----+
| 56|  Male|       88.3|      1.71|    180|    157|         60|                    1.69|         1313.0|        Yoga|          12.6|                  3.5|                            4|               3| 30.2|
| 46|Female|       74.9|      1.53|    179|    151|         66|                     1.3|          883.0|        HIIT|          33.9|                  2.1|              

There are a numer of way to drop Null rows:
* Any - any row where there's at least 1 null value will be dropped.
* All - drop a row only if all it's values are null.

In [25]:
##1. any==how
# df_pyspark.na.drop(how="all").show()
df_pyspark.na.drop(how="any").show(3)

+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+-----+
|Age|Gender|Weight (kg)|Height (m)|Max_BPM|Avg_BPM|Resting_BPM|Session_Duration (hours)|Calories_Burned|Workout_Type|Fat_Percentage|Water_Intake (liters)|Workout_Frequency (days/week)|Experience_Level|  BMI|
+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+-----+
| 56|  Male|       88.3|      1.71|    180|    157|         60|                    1.69|         1313.0|        Yoga|          12.6|                  3.5|                            4|               3| 30.2|
| 46|Female|       74.9|      1.53|    179|    151|         66|                     1.3|          883.0|        HIIT|          33.9|                  2.1|              

In [24]:
# at least 2 non-null values should be present
df_pyspark.na.drop(how="any", thresh=2).show(3)

+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+-----+
|Age|Gender|Weight (kg)|Height (m)|Max_BPM|Avg_BPM|Resting_BPM|Session_Duration (hours)|Calories_Burned|Workout_Type|Fat_Percentage|Water_Intake (liters)|Workout_Frequency (days/week)|Experience_Level|  BMI|
+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+-----+
| 56|  Male|       88.3|      1.71|    180|    157|         60|                    1.69|         1313.0|        Yoga|          12.6|                  3.5|                            4|               3| 30.2|
| 46|Female|       74.9|      1.53|    179|    151|         66|                     1.3|          883.0|        HIIT|          33.9|                  2.1|              

In [32]:
# subset - drop only null values from a specific  column 
df_pyspark.na.drop(how="any", subset=['Gender']).show(3)

+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+-----+
|Age|Gender|Weight (kg)|Height (m)|Max_BPM|Avg_BPM|Resting_BPM|Session_Duration (hours)|Calories_Burned|Workout_Type|Fat_Percentage|Water_Intake (liters)|Workout_Frequency (days/week)|Experience_Level|  BMI|
+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+-----+
| 56|  Male|       88.3|      1.71|    180|    157|         60|                    1.69|         1313.0|        Yoga|          12.6|                  3.5|                            4|               3| 30.2|
| 46|Female|       74.9|      1.53|    179|    151|         66|                     1.3|          883.0|        HIIT|          33.9|                  2.1|              

## Part 6: Fill Null Vlaues


In [37]:
## Filling missing values with the value string: Missing Values
df_pyspark.na.fill('Missing Values').show(2)

+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+----+
|Age|Gender|Weight (kg)|Height (m)|Max_BPM|Avg_BPM|Resting_BPM|Session_Duration (hours)|Calories_Burned|Workout_Type|Fat_Percentage|Water_Intake (liters)|Workout_Frequency (days/week)|Experience_Level| BMI|
+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+----+
| 56|  Male|       88.3|      1.71|    180|    157|         60|                    1.69|         1313.0|        Yoga|          12.6|                  3.5|                            4|               3|30.2|
| 46|Female|       74.9|      1.53|    179|    151|         66|                     1.3|          883.0|        HIIT|          33.9|                  2.1|                  

In [38]:
## Filling missing values with the value string: Missing Values in a specific column
df_pyspark.na.fill('Missing Values',['Age']).show(2)

+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+----+
|Age|Gender|Weight (kg)|Height (m)|Max_BPM|Avg_BPM|Resting_BPM|Session_Duration (hours)|Calories_Burned|Workout_Type|Fat_Percentage|Water_Intake (liters)|Workout_Frequency (days/week)|Experience_Level| BMI|
+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+----+
| 56|  Male|       88.3|      1.71|    180|    157|         60|                    1.69|         1313.0|        Yoga|          12.6|                  3.5|                            4|               3|30.2|
| 46|Female|       74.9|      1.53|    179|    151|         66|                     1.3|          883.0|        HIIT|          33.9|                  2.1|                  

In [42]:
## Filling with Mean value for numeric column - imputer
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols = ['Age', 'BMI'],
    outputCols = ["{}_imputed".format(c) for c in ['Age', 'BMI']]
                  ).setStrategy("mean")

In [44]:
## add the imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show(3)

+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+-----+-----------+-----------+
|Age|Gender|Weight (kg)|Height (m)|Max_BPM|Avg_BPM|Resting_BPM|Session_Duration (hours)|Calories_Burned|Workout_Type|Fat_Percentage|Water_Intake (liters)|Workout_Frequency (days/week)|Experience_Level|  BMI|Age_imputed|BMI_imputed|
+---+------+-----------+----------+-------+-------+-----------+------------------------+---------------+------------+--------------+---------------------+-----------------------------+----------------+-----+-----------+-----------+
| 56|  Male|       88.3|      1.71|    180|    157|         60|                    1.69|         1313.0|        Yoga|          12.6|                  3.5|                            4|               3| 30.2|         56|       30.2|
| 46|Female|       74.9|      1.53|    179|    151|         66|         