# Extract, Transform and Load Using PySpark

In this notebook, I used the pyspark API to perform ETL on the Canadian Pipeline incidents data (open data).

## Import Libraries

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col, asc,desc

## Create Spark Session

In [2]:
spark = SparkSession.builder.appName('pipelineincidents_ETL').getOrCreate()
spark

## Define Extract Function

In [3]:
def extract():
    df_pyspark = spark.read.csv('pipeline-incidents-data.csv', header=True, inferSchema=True)
    return df_pyspark

## Extract CSV into Spark Dataframe

In [4]:
# Call extract function
incidents_df = extract()
incidents_df.printSchema()

root
 |-- Incident Number: string (nullable = true)
 |-- Incident Types: string (nullable = true)
 |-- Reported Date: string (nullable = true)
 |-- Nearest Populated Centre: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Approximate Volume Released (m3): string (nullable = true)
 |-- Substance: string (nullable = true)
 |-- Release Type: string (nullable = true)
 |-- Significant: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- What Happened: string (nullable = true)
 |-- Why It Happened: string (nullable = true)



## Define Transformation Function

In [5]:
def transform(data):
    # Select specific columns
    select_columns = ['Incident Number', 'Incident Types', 'Reported Date', 'Province', 'Company', 'Substance', 'Approximate Volume Released (m3)']
    data = data.select(select_columns)
    
    # Rename Column
    data = data.withColumnRenamed('Approximate Volume Released (m3)', 'Volume Released (m3)')
    
    # Replace values
    data = data.withColumn('Volume Released (m3)', regexp_replace('Volume Released (m3)', 'Not Provided', ''))
    
    # Change Column Type
    data = data.withColumn('Volume Released (m3)', data['Volume Released (m3)'].cast('float'))
    
    return data

## Apply Transformation to Dataframe

In [6]:
incidents_df_transformed = transform(incidents_df)
incidents_df_transformed.describe().show()

+-------+---------------+--------------------+-------------+------------+--------------------+---------+--------------------+
|summary|Incident Number|      Incident Types|Reported Date|    Province|             Company|Substance|Volume Released (m3)|
+-------+---------------+--------------------+-------------+------------+--------------------+---------+--------------------+
|  count|           1485|                1485|         1485|        1485|                1485|     1485|                 493|
|   mean|           null|                null|         null|        null|                null|     null|  111555.51457974689|
| stddev|           null|                null|         null|        null|                null|     null|   978309.9793791081|
|    min|    INC2007-097|Adverse Environme...|   01/01/2016|     Alberta|2193914 Canada Li...|    Amine|              1.0E-4|
|    max|    INC2021-092|Serious Injury (C...|   12/31/2018|Saskatchewan|Westcoast Energy ...|    Water|              

## Display some Insights from Data

### Count of Incidents by Incident Types

In [7]:
# incidents_df_transformed.groupBy('Incident Types').count().sort(col('count').desc()).show(truncate=False)

### Count of Incidents by Province

In [8]:
# incidents_df_transformed.groupBy('Province').count().sort(col('count').desc()).show(truncate=False)

### Count of Incidents by Company

In [9]:
# incidents_df_transformed.groupBy('Company').count().sort(col('count').desc()).show(truncate=False)

### Count of Incidents by Substance

In [10]:
# incidents_df_transformed.groupBy('Substance').count().sort(col('count').desc()).show(truncate=False)

## Define Load Function

In [20]:
def load_csv(data):
    data.toPandas().to_csv('pipeline_incidents.csv')
    
def load_json(data):
    data.toPandas().to_json('pipeline_incidents.json')

## Load Data to CSV and JSON Files

In [21]:
load_csv(incidents_df_transformed)
load_json(incidents_df_transformed)

## Author

Taiwo Fawumi

taiwo.fawumi@yahoo.com

Published Nov., 2021