In [1]:
import pyspark

In [2]:
pyspark.__version__

'3.5.5'

In [5]:
import pandas as pd
pd.read_csv('statsdata.csv')

Unnamed: 0,REF_DATE,GEO,DGUID,Labour force characteristics,Gender,Age group,UOM,UOM_ID,SCALAR_FACTOR,SCALAR_ID,VECTOR,COORDINATE,VALUE,STATUS,SYMBOL,TERMINATED,DECIMALS
0,2024-11,Canada,2021A000011124,Employment,Total - Gender,15 years and over,Persons in thousands,428,thousands,3,v2091072,1.3.1.1,20828.6,,,,1
1,2024-12,Canada,2021A000011124,Employment,Total - Gender,15 years and over,Persons in thousands,428,thousands,3,v2091072,1.3.1.1,20843.6,,,,1
2,2025-01,Canada,2021A000011124,Employment,Total - Gender,15 years and over,Persons in thousands,428,thousands,3,v2091072,1.3.1.1,20670.1,,,,1
3,2025-02,Canada,2021A000011124,Employment,Total - Gender,15 years and over,Persons in thousands,428,thousands,3,v2091072,1.3.1.1,20768.6,,,,1
4,2025-03,Canada,2021A000011124,Employment,Total - Gender,15 years and over,Persons in thousands,428,thousands,3,v2091072,1.3.1.1,20758.0,,,,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
325,2024-11,Canada,2021A000011124,Employment,Women+,70 years and over,Persons in thousands,428,thousands,3,v2091512,1.3.3.21,148.7,,,,1
326,2024-12,Canada,2021A000011124,Employment,Women+,70 years and over,Persons in thousands,428,thousands,3,v2091512,1.3.3.21,156.8,,,,1
327,2025-01,Canada,2021A000011124,Employment,Women+,70 years and over,Persons in thousands,428,thousands,3,v2091512,1.3.3.21,153.1,,,,1
328,2025-02,Canada,2021A000011124,Employment,Women+,70 years and over,Persons in thousands,428,thousands,3,v2091512,1.3.3.21,156.6,,,,1


**Step 1: Creating a SparkSession**
A SparkSession is an entry point into all functionality in Spark, and is required if you want to build a dataframe in PySpark.

spark = SparkSession.builder.appName("Pyspark ETL project").config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","10g").getOrCreate()
Sure! Here's a concise and informative explanation:

SparkSession Configuration Breakdown
  1. **SparkSession.builder**: Initializes the builder for creating a SparkSession.
  2. **.appName("Pyspark ETL project")**: Sets the name of the Spark application.
  3. **.config("spark.memory.offHeap.enabled", "true")**: Enables off-heap memory to reduce JVM garbage collection overhead and improve performance for memory-intensive applications.
  4. **.config("spark.memory.offHeap.size", "10g")**: Allocates 10 gigabytes of off-heap memory.
  5. **.getOrCreate()**: Creates a new SparkSession if none exists, or retrieves the existing one.

PySpark interacts with Spark’s core, which runs on the JVM (Java Virtual Machine) and is primarily written in Scala. When a SparkSession is created, it initializes the JVM environment for executing Spark operations. The JVM manages heap memory and performs garbage collection, which reclaims unused memory but can cause performance bottlenecks. To reduce this overhead, Spark supports off-heap memory—memory managed outside the JVM heap—minimizing garbage collection pauses and boosting performance. Off-heap memory can be enabled via configuration settings during SparkSession initialization, making Spark applications more memory-efficient.

In [6]:
from pyspark.sql import SparkSession

In [47]:
spark = SparkSession.builder.appName("Pyspark ETL project").config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","10g").getOrCreate()


In [12]:
spark

In [48]:
df = spark.read.csv('statsdata.csv',header=True)

In [49]:
#df.show()
df.show(5,0) #same as df.head(5)

+--------+------+--------------+----------------------------+--------------+-----------------+--------------------+------+-------------+---------+--------+----------+-------+------+------+----------+--------+
|REF_DATE|GEO   |DGUID         |Labour force characteristics|Gender        |Age group        |UOM                 |UOM_ID|SCALAR_FACTOR|SCALAR_ID|VECTOR  |COORDINATE|VALUE  |STATUS|SYMBOL|TERMINATED|DECIMALS|
+--------+------+--------------+----------------------------+--------------+-----------------+--------------------+------+-------------+---------+--------+----------+-------+------+------+----------+--------+
|2024-11 |Canada|2021A000011124|Employment                  |Total - Gender|15 years and over|Persons in thousands|428   |thousands    |3        |v2091072|1.3.1.1   |20828.6|NULL  |NULL  |NULL      |1       |
|2024-12 |Canada|2021A000011124|Employment                  |Total - Gender|15 years and over|Persons in thousands|428   |thousands    |3        |v2091072|1.3.1.1  

**Step 2:  Data Wrangling / Data Preprocessing**


2.1 Inspecting Data inclues Viewing column names, data types, and summary statistics.

In [50]:
type(df)

In [51]:
### Check the schema
df.printSchema()   #same as df.info() in pandas

root
 |-- REF_DATE: string (nullable = true)
 |-- GEO: string (nullable = true)
 |-- DGUID: string (nullable = true)
 |-- Labour force characteristics: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age group: string (nullable = true)
 |-- UOM: string (nullable = true)
 |-- UOM_ID: string (nullable = true)
 |-- SCALAR_FACTOR: string (nullable = true)
 |-- SCALAR_ID: string (nullable = true)
 |-- VECTOR: string (nullable = true)
 |-- COORDINATE: string (nullable = true)
 |-- VALUE: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- SYMBOL: string (nullable = true)
 |-- TERMINATED: string (nullable = true)
 |-- DECIMALS: string (nullable = true)



In [52]:
df.count()  #no. of rows in the dataframe

330

In [53]:
df.columns  #column names in the dataframe

['REF_DATE',
 'GEO',
 'DGUID',
 'Labour force characteristics',
 'Gender',
 'Age group',
 'UOM',
 'UOM_ID',
 'SCALAR_FACTOR',
 'SCALAR_ID',
 'VECTOR',
 'COORDINATE',
 'VALUE',
 'STATUS',
 'SYMBOL',
 'TERMINATED',
 'DECIMALS']

To view some columns: use Select and show

In [54]:
df.select(['REF_DATE','Gender', 'Age group','VALUE',]).show(5)

+--------+--------------+-----------------+-------+
|REF_DATE|        Gender|        Age group|  VALUE|
+--------+--------------+-----------------+-------+
| 2024-11|Total - Gender|15 years and over|20828.6|
| 2024-12|Total - Gender|15 years and over|20843.6|
| 2025-01|Total - Gender|15 years and over|20670.1|
| 2025-02|Total - Gender|15 years and over|20768.6|
| 2025-03|Total - Gender|15 years and over|20758.0|
+--------+--------------+-----------------+-------+
only showing top 5 rows



In [55]:
df.dtypes #datatype of each column

[('REF_DATE', 'string'),
 ('GEO', 'string'),
 ('DGUID', 'string'),
 ('Labour force characteristics', 'string'),
 ('Gender', 'string'),
 ('Age group', 'string'),
 ('UOM', 'string'),
 ('UOM_ID', 'string'),
 ('SCALAR_FACTOR', 'string'),
 ('SCALAR_ID', 'string'),
 ('VECTOR', 'string'),
 ('COORDINATE', 'string'),
 ('VALUE', 'string'),
 ('STATUS', 'string'),
 ('SYMBOL', 'string'),
 ('TERMINATED', 'string'),
 ('DECIMALS', 'string')]

In [56]:
df.describe().show() #summary of all columns

+-------+--------+------+--------------+----------------------------+------+-----------------+--------------------+------+-------------+---------+---------+----------+-----------------+------+------+----------+--------+
|summary|REF_DATE|   GEO|         DGUID|Labour force characteristics|Gender|        Age group|                 UOM|UOM_ID|SCALAR_FACTOR|SCALAR_ID|   VECTOR|COORDINATE|            VALUE|STATUS|SYMBOL|TERMINATED|DECIMALS|
+-------+--------+------+--------------+----------------------------+------+-----------------+--------------------+------+-------------+---------+---------+----------+-----------------+------+------+----------+--------+
|  count|     330|   330|           330|                         330|   330|              330|                 330|   330|          330|      330|      330|       330|              330|     0|     0|         0|     330|
|   mean|    NULL|  NULL|          NULL|                        NULL|  NULL|             NULL|                NULL| 428.

2.2 Transforming Data: Adding, removing, or renaming columns; encoding categorical variables; normalizing data.

In [57]:
### Adding Columns in data frame
df=df.withColumn('Employment_Count',df['VALUE']*1000)


In [41]:
df.show(5)

+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
|REF_DATE|        Gender|        Age group|  VECTOR|COORDINATE|  VALUE|STATUS|SYMBOL|TERMINATED|Employment_Count|
+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
| 2024-11|Total - Gender|15 years and over|v2091072|   1.3.1.1|20828.6|  NULL|  NULL|      NULL|       2.08286E7|
| 2024-12|Total - Gender|15 years and over|v2091072|   1.3.1.1|20843.6|  NULL|  NULL|      NULL|       2.08436E7|
| 2025-01|Total - Gender|15 years and over|v2091072|   1.3.1.1|20670.1|  NULL|  NULL|      NULL|       2.06701E7|
| 2025-02|Total - Gender|15 years and over|v2091072|   1.3.1.1|20768.6|  NULL|  NULL|      NULL|       2.07686E7|
| 2025-03|Total - Gender|15 years and over|v2091072|   1.3.1.1|20758.0|  NULL|  NULL|      NULL|        2.0758E7|
+--------+--------------+-----------------+--------+----------+-------+------+------+---

In [58]:
### Drop the columns
df=df.drop("GEO","DGUID","UOM","UOM_ID","SCALAR_FACTOR","SCALAR_ID","DECIMALS","Labour force characteristics")

In [40]:
df.show(5)

+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
|REF_DATE|        Gender|        Age group|  VECTOR|COORDINATE|  VALUE|STATUS|SYMBOL|TERMINATED|Employment_Count|
+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
| 2024-11|Total - Gender|15 years and over|v2091072|   1.3.1.1|20828.6|  NULL|  NULL|      NULL|       2.08286E7|
| 2024-12|Total - Gender|15 years and over|v2091072|   1.3.1.1|20843.6|  NULL|  NULL|      NULL|       2.08436E7|
| 2025-01|Total - Gender|15 years and over|v2091072|   1.3.1.1|20670.1|  NULL|  NULL|      NULL|       2.06701E7|
| 2025-02|Total - Gender|15 years and over|v2091072|   1.3.1.1|20768.6|  NULL|  NULL|      NULL|       2.07686E7|
| 2025-03|Total - Gender|15 years and over|v2091072|   1.3.1.1|20758.0|  NULL|  NULL|      NULL|        2.0758E7|
+--------+--------------+-----------------+--------+----------+-------+------+------+---

In [59]:
### Rename the columns
df.withColumnRenamed('value', "Employment_Count (in thousands)" ).show(5)

+--------+--------------+-----------------+--------+----------+-------------------------------+------+------+----------+----------------+
|REF_DATE|        Gender|        Age group|  VECTOR|COORDINATE|Employment_Count (in thousands)|STATUS|SYMBOL|TERMINATED|Employment_Count|
+--------+--------------+-----------------+--------+----------+-------------------------------+------+------+----------+----------------+
| 2024-11|Total - Gender|15 years and over|v2091072|   1.3.1.1|                        20828.6|  NULL|  NULL|      NULL|       2.08286E7|
| 2024-12|Total - Gender|15 years and over|v2091072|   1.3.1.1|                        20843.6|  NULL|  NULL|      NULL|       2.08436E7|
| 2025-01|Total - Gender|15 years and over|v2091072|   1.3.1.1|                        20670.1|  NULL|  NULL|      NULL|       2.06701E7|
| 2025-02|Total - Gender|15 years and over|v2091072|   1.3.1.1|                        20768.6|  NULL|  NULL|      NULL|       2.07686E7|
| 2025-03|Total - Gender|15 years 

2.3 Cleaning Data: Handling missing values, removing duplicates, and correcting errors.

In [60]:
#df.na.drop().show()  #whereever it sees null value it will drop that row

In [61]:
### any==how
df.na.drop(how="all").show(5) #if all the values of the row are null only then delete , how="any" is the other option which is default as well

+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
|REF_DATE|        Gender|        Age group|  VECTOR|COORDINATE|  VALUE|STATUS|SYMBOL|TERMINATED|Employment_Count|
+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
| 2024-11|Total - Gender|15 years and over|v2091072|   1.3.1.1|20828.6|  NULL|  NULL|      NULL|       2.08286E7|
| 2024-12|Total - Gender|15 years and over|v2091072|   1.3.1.1|20843.6|  NULL|  NULL|      NULL|       2.08436E7|
| 2025-01|Total - Gender|15 years and over|v2091072|   1.3.1.1|20670.1|  NULL|  NULL|      NULL|       2.06701E7|
| 2025-02|Total - Gender|15 years and over|v2091072|   1.3.1.1|20768.6|  NULL|  NULL|      NULL|       2.07686E7|
| 2025-03|Total - Gender|15 years and over|v2091072|   1.3.1.1|20758.0|  NULL|  NULL|      NULL|        2.0758E7|
+--------+--------------+-----------------+--------+----------+-------+------+------+---

In [62]:
##threshold
df.na.drop(how="any",thresh=7).show(5)  #keep those rows which have atleast 7 non null values, The thresh parameter in the drop() method specifies the minimum number of non-null values required to keep a row.

+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
|REF_DATE|        Gender|        Age group|  VECTOR|COORDINATE|  VALUE|STATUS|SYMBOL|TERMINATED|Employment_Count|
+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
| 2024-11|Total - Gender|15 years and over|v2091072|   1.3.1.1|20828.6|  NULL|  NULL|      NULL|       2.08286E7|
| 2024-12|Total - Gender|15 years and over|v2091072|   1.3.1.1|20843.6|  NULL|  NULL|      NULL|       2.08436E7|
| 2025-01|Total - Gender|15 years and over|v2091072|   1.3.1.1|20670.1|  NULL|  NULL|      NULL|       2.06701E7|
| 2025-02|Total - Gender|15 years and over|v2091072|   1.3.1.1|20768.6|  NULL|  NULL|      NULL|       2.07686E7|
| 2025-03|Total - Gender|15 years and over|v2091072|   1.3.1.1|20758.0|  NULL|  NULL|      NULL|        2.0758E7|
+--------+--------------+-----------------+--------+----------+-------+------+------+---

In [63]:
##Subset
df.na.drop(how="any",subset=['value']).show(5) # since value column is important, check if contain any null value, if in value column there is null value then delete that row

+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
|REF_DATE|        Gender|        Age group|  VECTOR|COORDINATE|  VALUE|STATUS|SYMBOL|TERMINATED|Employment_Count|
+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
| 2024-11|Total - Gender|15 years and over|v2091072|   1.3.1.1|20828.6|  NULL|  NULL|      NULL|       2.08286E7|
| 2024-12|Total - Gender|15 years and over|v2091072|   1.3.1.1|20843.6|  NULL|  NULL|      NULL|       2.08436E7|
| 2025-01|Total - Gender|15 years and over|v2091072|   1.3.1.1|20670.1|  NULL|  NULL|      NULL|       2.06701E7|
| 2025-02|Total - Gender|15 years and over|v2091072|   1.3.1.1|20768.6|  NULL|  NULL|      NULL|       2.07686E7|
| 2025-03|Total - Gender|15 years and over|v2091072|   1.3.1.1|20758.0|  NULL|  NULL|      NULL|        2.0758E7|
+--------+--------------+-----------------+--------+----------+-------+------+------+---

In [64]:
### Filling the Missing Value
df.na.fill('Missing Values',['status']).show(5)

+--------+--------------+-----------------+--------+----------+-------+--------------+------+----------+----------------+
|REF_DATE|        Gender|        Age group|  VECTOR|COORDINATE|  VALUE|        STATUS|SYMBOL|TERMINATED|Employment_Count|
+--------+--------------+-----------------+--------+----------+-------+--------------+------+----------+----------------+
| 2024-11|Total - Gender|15 years and over|v2091072|   1.3.1.1|20828.6|Missing Values|  NULL|      NULL|       2.08286E7|
| 2024-12|Total - Gender|15 years and over|v2091072|   1.3.1.1|20843.6|Missing Values|  NULL|      NULL|       2.08436E7|
| 2025-01|Total - Gender|15 years and over|v2091072|   1.3.1.1|20670.1|Missing Values|  NULL|      NULL|       2.06701E7|
| 2025-02|Total - Gender|15 years and over|v2091072|   1.3.1.1|20768.6|Missing Values|  NULL|      NULL|       2.07686E7|
| 2025-03|Total - Gender|15 years and over|v2091072|   1.3.1.1|20758.0|Missing Values|  NULL|      NULL|        2.0758E7|
+--------+--------------

In [65]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['value','Age group','gender','ref_date'],
    outputCols=["{}_imputed".format(c) for c in ['value','Age group','gender','ref_date']]
    ).setStrategy("median")

The Imputer is used to fill missing values in a DataFrame.
inputCols: Specifies the columns in the DataFrame that contain missing values and need imputation.outputCols: Specifies the names of the new columns that will contain the imputed values. outputcols code dynamically generates the names of the output columns by appending _imputed to each input column name.
setStrategy("median"): Sets the imputation strategy to "median". This means that missing values will be filled with the median value of the respective column.

In [67]:
df.show(5)

+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
|REF_DATE|        Gender|        Age group|  VECTOR|COORDINATE|  VALUE|STATUS|SYMBOL|TERMINATED|Employment_Count|
+--------+--------------+-----------------+--------+----------+-------+------+------+----------+----------------+
| 2024-11|Total - Gender|15 years and over|v2091072|   1.3.1.1|20828.6|  NULL|  NULL|      NULL|       2.08286E7|
| 2024-12|Total - Gender|15 years and over|v2091072|   1.3.1.1|20843.6|  NULL|  NULL|      NULL|       2.08436E7|
| 2025-01|Total - Gender|15 years and over|v2091072|   1.3.1.1|20670.1|  NULL|  NULL|      NULL|       2.06701E7|
| 2025-02|Total - Gender|15 years and over|v2091072|   1.3.1.1|20768.6|  NULL|  NULL|      NULL|       2.07686E7|
| 2025-03|Total - Gender|15 years and over|v2091072|   1.3.1.1|20758.0|  NULL|  NULL|      NULL|        2.0758E7|
+--------+--------------+-----------------+--------+----------+-------+------+------+---

Filter operations

In [70]:
df = df.withColumn('REF_DATE',to_timestamp("reference_date", 'yyyy/MM'))

NameError: name 'to_timestamp' is not defined

In [69]:
df.filter("REF_DATE">=2025-01).select(['Age group','gender','ref_date','value']).show()

SyntaxError: leading zeros in decimal integer literals are not permitted; use an 0o prefix for octal integers (<ipython-input-69-4208c1a410d8>, line 1)

In [None]:
df_pyspark.filter(df_pyspark['Salary']<=20000).show()

In [None]:
df_pyspark.filter((df_pyspark['Salary']<=20000) |
                  (df_pyspark['Salary']>=15000)).show()

In [None]:
df_pyspark.filter(~(df_pyspark['Salary']<=20000)).show()

Step 3: Exploratory Data Analysis

In [None]:
df.select('CustomerID').distinct().count() # Answer: 4373

In [None]:
df.groupBy('Country').agg(countDistinct('CustomerID').alias('country_count')).show()