<a href="https://colab.research.google.com/github/pratikagithub/DS-Case-Studies/blob/main/Building_an_ETL_Pipeline_using_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

An ETL (Extract, Transform, and Load) pipeline extracts data from sources, transforms it, and loads it into a storage system. It helps create clean, usable data formats for analysis. PySpark is ideal for building ETL pipelines for large-scale data processing. It offers distributed computing, high performance, and handles structured and unstructured data efficiently.

The dataset we will be using for building an ETL Pipeline contains temperature-related data for various countries from 1961 to 2022. The columns include identifiers like ObjectId, Country, ISO2, and ISO3, along with year-wise temperature data such as F1961, F1962, etc., as floating-point values. Some columns contain missing values.

We’ll develop an ETL Pipeline using PySpark to process this dataset to handle the following tasks:

Extract: Load the dataset from the CSV file.

Transform: Clean the data, handle missing values, and pivot year-wise temperature data for analysis.

Load: Save the processed data into a new storage format (e.g., Parquet or a database).

**Step 1: Setting Up the Environment & Initializing a PySpark Session**

Ensure that PySpark is installed and set up. Run the following command to install PySpark if it’s not already installed:

Initialize a PySpark session to enable interaction with the Spark framework:

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

# initialize SparkSession
spark = SparkSession.builder \
    .appName("ETL Pipeline") \
    .getOrCreate()

**Step 2: Extract – Load the Dataset**

The next step is to load the dataset into a PySpark DataFrame:

In [3]:
# load the CSV file into a Spark DataFrame
from google.colab import files
uploaded = files.upload()
file_path = "/content/temperature.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# display the schema and preview the data
df.printSchema()
df.show(5)

Saving temperature.csv to temperature.csv
root
 |-- ObjectId: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- ISO2: string (nullable = true)
 |-- ISO3: string (nullable = true)
 |-- F1961: double (nullable = true)
 |-- F1962: double (nullable = true)
 |-- F1963: double (nullable = true)
 |-- F1964: double (nullable = true)
 |-- F1965: double (nullable = true)
 |-- F1966: double (nullable = true)
 |-- F1967: double (nullable = true)
 |-- F1968: double (nullable = true)
 |-- F1969: double (nullable = true)
 |-- F1970: double (nullable = true)
 |-- F1971: double (nullable = true)
 |-- F1972: double (nullable = true)
 |-- F1973: double (nullable = true)
 |-- F1974: double (nullable = true)
 |-- F1975: double (nullable = true)
 |-- F1976: double (nullable = true)
 |-- F1977: double (nullable = true)
 |-- F1978: double (nullable = true)
 |-- F1979: double (nullable = true)
 |-- F1980: double (nullable = true)
 |-- F1981: double (nullable = true)
 |-- F1982: double (null

n PySpark, we are loading a CSV file into a distributed DataFrame, which is similar to using pandas.read_csv() to load data into a Pandas DataFrame. However, unlike Pandas, which uses memory and runs on a single machine, PySpark handles large datasets distributed across a cluster. The methods df.printSchema() and df.show(5) provide insights into the schema and preview the data, comparable to df.info() and df.head() in Pandas, but designed for scalable data exploration on big data workloads.

**Step 3: Transform – Clean and Process the Data**

All datasets require different types of cleaning and processing steps. In this data, we will replace missing values in important columns like ISO2 or impute missing temperature values:

In [4]:
# fill missing values for country codes
df = df.fillna({"ISO2": "Unknown"})

# drop rows where all temperature values are null
temperature_columns = [col for col in df.columns if col.startswith('F')]
df = df.dropna(subset=temperature_columns, how="all")

Next, we will transform the dataset to have “Year” as a single column and its temperature value:

In [5]:
from pyspark.sql.functions import expr

# reshape temperature data to have 'Year' and 'Temperature' columns
df_pivot = df.selectExpr(
    "ObjectId", "Country", "ISO3",
    "stack(62, " +
    ",".join([f"'F{1961 + i}', F{1961 + i}" for i in range(62)]) +
    ") as (Year, Temperature)"
)

# convert 'Year' column to integer
df_pivot = df_pivot.withColumn("Year", expr("int(substring(Year, 2, 4))"))
df_pivot.show(5)

+--------+--------------------+----+----+-----------+
|ObjectId|             Country|ISO3|Year|Temperature|
+--------+--------------------+----+----+-----------+
|       1|Afghanistan, Isla...| AFG|1961|     -0.113|
|       1|Afghanistan, Isla...| AFG|1962|     -0.164|
|       1|Afghanistan, Isla...| AFG|1963|      0.847|
|       1|Afghanistan, Isla...| AFG|1964|     -0.764|
|       1|Afghanistan, Isla...| AFG|1965|     -0.244|
+--------+--------------------+----+----+-----------+
only showing top 5 rows



**Step 4: Load – Save the Processed Data**

After completing all the processing steps, you save the transformed data to a Parquet file for efficient storage and querying:

In [6]:
output_path = "/processed_temperature.parquet"
df_pivot.write.mode("overwrite").parquet(output_path)

This operation saves the transformed DataFrame as a Parquet file, which optimizes it for storage and querying in a distributed environment.

We can load the saved Parquet file to ensure the data was correctly saved:

In [7]:
# load the saved parquet file
processed_df = spark.read.parquet(output_path)
processed_df.show(5)

+--------+--------------------+----+----+-----------+
|ObjectId|             Country|ISO3|Year|Temperature|
+--------+--------------------+----+----+-----------+
|       1|Afghanistan, Isla...| AFG|1961|     -0.113|
|       1|Afghanistan, Isla...| AFG|1962|     -0.164|
|       1|Afghanistan, Isla...| AFG|1963|      0.847|
|       1|Afghanistan, Isla...| AFG|1964|     -0.764|
|       1|Afghanistan, Isla...| AFG|1965|     -0.244|
+--------+--------------------+----+----+-----------+
only showing top 5 rows

