# Spark ETL Pipeline Implementation and Failure Detection

# Extract

We will commence by extracting the required datasets for this project from various CSV files. The datasets include the `Pipeline data table`, `Maintenance data table`, `Sensor data table`, and `Inspection data table`.

In [1]:
import findspark
findspark.init('/Users/Mubashir/spark')


In [2]:
#Importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import *

from pyspark.sql.functions import *

from pyspark.sql import SQLContext

In [3]:
#Setting path for postgres driver
import os
os.environ['PYSPARK_ARGS'] = '--driver-class-path /Users/Mubashir/postgresql-42.6.0.jar pyspark-shell'

# this sets a path for the postgres driver. It will be needed in the loading section

In [4]:
# Creating a SparkSession which serves as a entry point for interacting with Spark
spark = SparkSession.builder.appName("ETL Data Pipeline").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/21 12:48:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
#Reading the csv files
pipeline_df = spark.read.csv('pipeline_data_table.csv', header = True, inferSchema = True)
maintenance_df = spark.read.csv('maintenance_data_table.csv', header = True, inferSchema = True)
sensor_df = spark.read.csv('sensor_data_table.csv', header = True, inferSchema = True)
inspection_df = spark.read.csv('inspection_data_table.csv', header = True, inferSchema = True)

                                                                                

In [6]:
#Checking the head of our dat
pipeline_df.show(5)

+---+-----------+-----------------+------------------+---------+--------+------------+
|_c0|pipeline_id|        lenght_km|     diamteters_mm|age_years|material|    location|
+---+-----------+-----------------+------------------+---------+--------+------------+
|  0|       PL-1|3.163867170339976|205.96414132846047|       39|   steel|       lagos|
|  1|       PL-2|15.12457355971772| 655.0772854593554|       27| plastic|portharcourt|
|  2|       PL-3|43.00378167829604|237.04409028519854|       32|   steel|portharcourt|
|  3|       PL-4|78.16478032119788| 611.5345293219675|       19|concrete|       lagos|
|  4|       PL-5|76.14520496673097| 603.4796154760029|       41|concrete|       lagos|
+---+-----------+-----------------+------------------+---------+--------+------------+
only showing top 5 rows



23/06/21 12:48:43 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , pipeline_id, lenght_km, diamteters_mm, age_years, material, location
 Schema: _c0, pipeline_id, lenght_km, diamteters_mm, age_years, material, location
Expected: _c0 but found: 
CSV file: file:///Users/Judith/pipeline_data_table.csv


In [7]:
maintenance_df.show(5)

+---+-----------+-----------+---------------------+-----------------+
|_c0|pipeline_id|repair_type|repair_duration_hours|  cost_in_dollars|
+---+-----------+-----------+---------------------+-----------------+
|  0|       PL-1| preventive|                    8|486.9355980554817|
|  1|       PL-2|    routine|                    2|            313.0|
|  2|       PL-3|    routine|                   11|            252.0|
|  3|       PL-4| preventive|                   23|784.6322648503146|
|  4|       PL-5| corrective|                    9|11137.27992806314|
+---+-----------+-----------+---------------------+-----------------+
only showing top 5 rows



23/06/21 12:48:44 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , pipeline_id, repair_type, repair_duration_hours, cost_in_dollars
 Schema: _c0, pipeline_id, repair_type, repair_duration_hours, cost_in_dollars
Expected: _c0 but found: 
CSV file: file:///Users/Judith/maintenance_data_table.csv


In [8]:
sensor_df.show(5)

+---+---------+-----------+----------+------------------+------------------+------------------+
|_c0|sensor_id|pipeline_id|      date|          pressure|       temperature|         flow_rate|
+---+---------+-----------+----------+------------------+------------------+------------------+
|  0| sensor-2|       PL-1|2016-01-23| 89.16509363947836|26.487407250471655| 8.841277524541173|
|  1| sensor-4|       PL-2|2021-05-06|51.422621452961415|22.123033999440032|2.2398605213687572|
|  2| sensor-1|       PL-3|2014-08-03| 89.93827002882762|24.376792602298494| 8.798215574407859|
|  3| sensor-2|       PL-4|2019-03-26|  91.6988260369984| 24.12896689486083|4.4345016457065976|
|  4| sensor-3|       PL-5|2015-01-19| 77.33094021729232|23.286447126274545| 4.723534211586986|
+---+---------+-----------+----------+------------------+------------------+------------------+
only showing top 5 rows



23/06/21 12:48:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , sensor_id, pipeline_id, date, pressure, temperature, flow_rate
 Schema: _c0, sensor_id, pipeline_id, date, pressure, temperature, flow_rate
Expected: _c0 but found: 
CSV file: file:///Users/Judith/sensor_data_table.csv


In [9]:
inspection_df.show(5)

+---+-----------+---------------+-----------------+--------------+
|_c0|pipeline_id|corrosion_level|deformation_level|leak_detection|
+---+-----------+---------------+-----------------+--------------+
|  0|       PL-1|              4|                2|           Yes|
|  1|       PL-2|              3|                3|            No|
|  2|       PL-3|              1|                0|            No|
|  3|       PL-4|              2|                4|           Yes|
|  4|       PL-5|              2|                3|            No|
+---+-----------+---------------+-----------------+--------------+
only showing top 5 rows



23/06/21 12:48:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , pipeline_id, corrosion_level, deformation_level, leak_detection
 Schema: _c0, pipeline_id, corrosion_level, deformation_level, leak_detection
Expected: _c0 but found: 
CSV file: file:///Users/Judith/inspection_data_table.csv


# Transform

The main goal of this part is to select the important columns from each dataset and combine them into a single dataset


We will add a new column called "pipeline status" that will be crucial. Binary values of 1 and 0 will be displayed in the "pipeline status" column. A pipeline failure is represented by a number of 1, while no failures are present and the value is 0. We will use particular conditions generated from the existing columns to find these values, enabling us to precisely identify and trace pipeline faults throughout the data processing steps.

In [10]:
# Selecting important columns from pipeline_df dataset
pipeline_columns = ['pipeline_id','lenght_km', 'diamteters_mm', 'age_years', 'material', 'location']
pipeline_selected_df = pipeline_df.select(pipeline_columns)

# Selecting important columns from maintenance dataset
maintenance_columns = ['pipeline_id', 'repair_type', 'repair_duration_hours']
maintenance_selected_df = maintenance_df.select(maintenance_columns)

# Selecting important columns from sensor dataset
sensor_columns = ['pipeline_id', 'pressure', 'temperature', 'flow_rate']
sensor_selected_df = sensor_df.select(sensor_columns)

# Selecting important columns from inspection dataset
inspection_columns = ['pipeline_id','corrosion_level', 'deformation_level', 'leak_detection']
inspection_selected_df = inspection_df.select(inspection_columns)

In [11]:
# Creating a new dataset that is made up of selected columns from existing datasets, and joining them based on their 
# primary key ('pipeline_id')
joined_df = pipeline_selected_df \
    .join(maintenance_selected_df, "pipeline_id", "left") \
    .join(sensor_selected_df, "pipeline_id", "left") \
    .join(inspection_selected_df, "pipeline_id", "left")


In [12]:
# Checking the resulted dataframe
joined_df.show(5)

                                                                                

+-----------+-----------------+------------------+---------+--------+------------+-----------+---------------------+------------------+------------------+------------------+---------------+-----------------+--------------+
|pipeline_id|        lenght_km|     diamteters_mm|age_years|material|    location|repair_type|repair_duration_hours|          pressure|       temperature|         flow_rate|corrosion_level|deformation_level|leak_detection|
+-----------+-----------------+------------------+---------+--------+------------+-----------+---------------------+------------------+------------------+------------------+---------------+-----------------+--------------+
|       PL-1|3.163867170339976|205.96414132846047|       39|   steel|       lagos| preventive|                    8| 89.16509363947836|26.487407250471655| 8.841277524541173|              4|                2|           Yes|
|       PL-2|15.12457355971772| 655.0772854593554|       27| plastic|portharcourt|    routine|              

> ### We will create a new column called `pipeline_status` that is based on certain conditions derived from some existing columns in the unified joined_df dataset

The selected columns used to determine the values for the `pipeline_status` column are `corrosion_level`, `deformation_level`, `leak_detection`, `age_years`, and `repair_type`. These columns were selected becasue they are the most likely to have an effect on the wear and tear of the pipleline.

Ranks will be assigned to the values in these selected columns, with the highest possible sum of rank being 20. If the sum of these rank values for a particular row exceeds 10, the `pipeline_status` value for that row will be set to 1, indicating pipeline failure. On the other hand, if the sum of the ranks for each row is less than or equal to 10, the `pipeline_status` value for that row will be set to 0, indicating no pipeline failure.

### 1. Creating new columns that ranks each of the selected  columns

In [13]:
joined_df = joined_df.withColumn('corrosion_rank', 
                                 when(col('corrosion_level') == 0, 0).
                                 when(col('corrosion_level') == 1, 1).
                                 when(col('corrosion_level') == 2, 2).
                                 when(col('corrosion_level') == 3, 3).
                                 when(col('corrosion_level') == 4, 4).
                                 when(col('corrosion_level') == 5, 5))

joined_df = joined_df.withColumn('deformation_rank', 
                                 when(col('deformation_level') == 0, 0).
                                 when(col('deformation_level') == 1, 1).
                                 when(col('deformation_level') == 2, 2).
                                 when(col('deformation_level') == 3, 3).
                                 when(col('deformation_level') == 4, 4).
                                 when(col('deformation_level') == 5, 5))

joined_df = joined_df.withColumn('leak_detection_rank', 
                                 when(col('leak_detection') == 'Yes', 2).
                                 when(col('leak_detection') == 'No', 1))
                                 
joined_df = joined_df.withColumn('age_years_rank', 
                                 when(col('age_years') <= 10, 1).
                                 when(col('age_years') <= 20, 2).
                                 when(col('age_years') <= 30, 3).
                                 when(col('age_years') <= 40, 4).
                                 when(col('age_years') <= 50, 5))

joined_df = joined_df.withColumn('repair_type_rank', 
                                 when(col('repair_type') == 'routine', 1).
                                 when(col('repair_type') == 'preventive', 2).
                                 when(col('repair_type') == 'corrective', 3))
                                 

In [14]:
# Testing our rank columns to see if the mapping worked properly

joined_df.select("age_years", "age_years_rank").show(5)

+---------+--------------+
|age_years|age_years_rank|
+---------+--------------+
|       39|             4|
|       27|             3|
|       32|             4|
|       19|             2|
|       41|             5|
+---------+--------------+
only showing top 5 rows



### 2. Creating a column that results the sum of all the ranked columns  

In [15]:
joined_df = joined_df.withColumn('summed_ranks', col('corrosion_rank') + col('deformation_rank') 
                                 + col('leak_detection_rank') + col('age_years_rank') + col('repair_type_rank'))
                                 

In [16]:
# show the result
joined_df.select('corrosion_rank','deformation_rank','leak_detection_rank',
                 'age_years_rank','repair_type_rank','summed_ranks').show(5)

+--------------+----------------+-------------------+--------------+----------------+------------+
|corrosion_rank|deformation_rank|leak_detection_rank|age_years_rank|repair_type_rank|summed_ranks|
+--------------+----------------+-------------------+--------------+----------------+------------+
|             4|               2|                  2|             4|               2|          14|
|             3|               3|                  1|             3|               1|          11|
|             1|               0|                  1|             4|               1|           7|
|             2|               4|                  2|             2|               2|          12|
|             2|               3|                  1|             5|               3|          14|
+--------------+----------------+-------------------+--------------+----------------+------------+
only showing top 5 rows



### 3. Creating a `pipeline_status` column that result 1 if the summed_ranks column is greater than 10 and results 0 if the summed_ranks column is less than 10.

In the pipeline_status, 

- 1 indicates pipeline failure
- 0 indicates no pipeline failure

In [17]:
joined_df = joined_df.withColumn('pipeline_status', when(col('summed_ranks')>10, 1).otherwise(0))


In [18]:
#show the result
joined_df.select('pipeline_status').show(5)

+---------------+
|pipeline_status|
+---------------+
|              1|
|              1|
|              0|
|              1|
|              1|
+---------------+
only showing top 5 rows



In [19]:
#Dropping irrelevant columns
columns_to_drop = ['corrosion_rank', 'deformation_rank', 'leak_detection_rank', 'age_years_rank',
                   'repair_type_rank', 'summed_ranks']

joined_df = joined_df.drop(*columns_to_drop)

In [20]:
# show the result
joined_df.show(5)

+-----------+-----------------+------------------+---------+--------+------------+-----------+---------------------+------------------+------------------+------------------+---------------+-----------------+--------------+---------------+
|pipeline_id|        lenght_km|     diamteters_mm|age_years|material|    location|repair_type|repair_duration_hours|          pressure|       temperature|         flow_rate|corrosion_level|deformation_level|leak_detection|pipeline_status|
+-----------+-----------------+------------------+---------+--------+------------+-----------+---------------------+------------------+------------------+------------------+---------------+-----------------+--------------+---------------+
|       PL-1|3.163867170339976|205.96414132846047|       39|   steel|       lagos| preventive|                    8| 89.16509363947836|26.487407250471655| 8.841277524541173|              4|                2|           Yes|              1|
|       PL-2|15.12457355971772| 655.07728545

# Load

Our finalised dataset will be loaded into the Postgres Database. 

This procedure ensures secure storage and makes it easier to use the processed data for later analysis, reports, and decision-making processes.

In [21]:
# fill in postgres connection details
postgres_url = "jdbc:postgresql://localhost:5432/new_database"
properties = {
    "user": "myuser",
    "password": "reallyStrongPwd123",
    "driver": "org.postgresql.Driver"
}


In [22]:
joined_df.write \
    .jdbc(url=postgres_url, table="pipeline_failure", mode="overwrite", properties=properties)


                                                                                

After successfully loading our database into Postgres, we can now query it and carry out additional analysis there.