# Performing ETL Operation

In [1]:
# importing all neccasary libraries for reading and manipulating data
import pandas as pd
from pandas_profiling import ProfileReport      # for profiling the data
import numpy as np
import psycopg2                                 # for extracting/reading data from Postgres database
from psycopg2 import Error
import mysql.connector
from mysql.connector import Error
import sys, os
import credentials as creds                     # .py file storing all credential user id/password security purpose
import logging
from SQL_query import *                         # .py file which stores the SQL queries for connecting to MYSQL
import pandas.io.sql as psql
import pymysql
pymysql.install_as_MySQLdb()
from sqlalchemy import create_engine            # to load dataframe data into MYSQl tables and connect to MYSQL

## logging file has been created to create a log of all errors/info while running the ETL, useful for debugging purpose

In [2]:
# define connection database schema and table for reading dataset from Postgres connection
schema='"Accident_pipe"'
table='"Accidents"'

# define the logging file 
logging_file="ETLlogfile.log"
logging.basicConfig(filename=logging_file, level=logging.INFO, filemode='w',
                    format='%(asctime)s %(levelname)s %(name)s %(message)s')
logger=logging.getLogger(__name__)

# define the profile file
profile_file="Oil_Pipeline_Accident.html"

## Code for setting up connection to Postrgres server for extracting (reading) of the data

In [3]:

def load_data(schema, table):
 try: 
    connection_string = "host="+ creds.PGHOST +" port="+ "5432" +\
                        " dbname="+ creds.PGDATABASE +" user=" +\
                        creds.PGUSER +" password="+ creds.PGPASSWORD
    
    # Connect to database and Create a cursor object   
    conn=psycopg2.connect(connection_string)
    cursor = conn.cursor()
    
    logging.info("Connected Database Sucessfully! \n")
    
    # Reading the database schema and table     
    sql_command3=f'select * from {schema}.{table}'
    data = pd.read_sql(sql_command3, conn)
    
    logging.info('Step 1 -  Data loaded successfully \n')
    logging.info(' Dataframe head is - \n {} \n'.format(data.head()))
    
 except Exception as e:   
    logging.error(e)    
 finally:   
    if (conn):
        cursor.close()
        conn.close()
        logging.info("PostgreSQL connection is closed \n")
    return (data)



## Defining code for generating Profile Report for dataset in .html format

In [4]:
def create_profile_report(data):
    # Generating the profile report 
  try:  
    prof = ProfileReport(data,minimal=True)
    print(repr(prof.report))
    print(prof.report.content)
    prof.to_file(output_file=profile_file)
    
    rows,columns=data.shape[0],data.shape[1]
    logging.info('Step 2 - Profile report generated and Initial data on load has {} rows and {} columns \n'.format(rows,columns))
    
    print(f'Data has {data.shape[0]} rows and {data.shape[1]} columns \n')     
  except Exception as e:   
    logging.error( e) 

## Dropping constant value columns as columns with constant values, will not help in making decisions or ML questions

In [5]:

def remove_constant_value_columns(data):
  try :  
    drop_col=[e for e in data.columns if data[e].nunique() == 1]
    data[drop_col].to_csv('Constant_value_column.csv',index=False,sep=';',header=True)
    data.drop(drop_col,axis=1,inplace=True)
    
    logging.info(' Step 3 -  Constant values columns successfully  and stored in Constant_value_column.csv ') 
    logging.info(f'The constant columns dropped are {drop_col} \n') 
    rows,columns=data.shape[0],data.shape[1]   
    logging.info(' After removing constant value columns , data has now {} rows and {} columns \n'.format(rows,columns))
  except Exception as e:   
    logging.error(e)
  finally:
    return data  
    

## Applied Uniqueness Dimension ,code to drop duplicate rows from dataset and also keeps a copy of dupliacted to seperate .csv file for ETL testing purpose

In [6]:
# to remove duplicate rows
def removing_duplicated_rows(data):

 try:  
     
     duplicated_rows=data[data.duplicated(keep='first')]   
    
    # keep a copy of dupliacted rows in separate csv file for ETL testing purpose
     duplicated_rows.to_csv('Duplicated_rows.csv', index=False,sep=';',header=True)
        
     data.drop_duplicates(inplace=True)   # Drop the rows where the whole row is duplicated   
     data.drop_duplicates(inplace=True,subset="Report Number",keep="first") # to remove rows where the primary key Report number is duplicated 
        
     """ removing any duplicated columns (If any column values are same) as Report Number and Report are
     duplicate columns with same values ( as seen in profiling report )""" 
    
     logging.info(' Step 4 - Duplicated rows deleted successfully and stored in Duplicated_rows.csv \n')   
     rows,columns=data.shape[0],data.shape[1]
     logging.info('After removing duplicated rows, data has now {} rows and {} columns \n'.format(rows,columns))
    
 except Exception as e:   
      logging.error(e)
 finally:
      return data
 

## Uniqueness Dimension , code to drops duplicated columns , Columns names may be different names but there values are same (In dataset, Report_Number and Report are duplicated columns)

In [7]:
  
def removing_duplicated_columns(data):
  try:
    # Create an empty set
    duplicateColumnNames = set()  
    
    # Iterate through all the columns of dataframe
    for x in range(data.shape[1]): 
        
        # Take column at xth index.
        col = data.iloc[:, x]     
        
        # Iterate through all the columns in DataFrame from (x + 1)th index to last index
        for y in range(x + 1, data.shape[1]): 
            
            # Take column at yth index.
            otherCol = data.iloc[:, y] 
            
            # Check if two columns at x & y ,index are equal or not,if equal then adding to the set
            if col.equals(otherCol):
                duplicateColumnNames.add(data.columns.values[y])
     
    duplicateColumnNames=list(duplicateColumnNames)
    # keep a copy of dupliacted columns in separate csv file for ETL testing purpose
    data[duplicateColumnNames].to_csv("Duplicated_columns.csv",index=False,sep=';',header=True)
    # Dropping duplicate columns
    data.drop(columns = duplicateColumnNames,axis=1,inplace=True)            
   
    logging.info(' Step 5 - Duplicated columns deleted successfully and file is stored in Duplicated_columns.csv \n')   
    logging.info('Names of duplicated columns deleted are "{}" \n'.format(" , ".join(duplicateColumnNames)   ) )  
    rows,columns=data.shape[0],data.shape[1]
    logging.info(' After removing duplicate columns , data has now {} rows and {} columns \n'.format(rows,columns))   
    
  except error as e:
    logging.info(e)
  finally:    
     return data  

## Ensure consistency dimension and changing columns name ( space is replaced by "_" underscore)

In [8]:
# Consistency Dimension
def columns_name_change(data):
  try:  
    data.columns=data.columns.str.replace(" ","_")
    
    logging.info(' Step 6 - Column Name changed successfully \n')
    logging.info('New column names after name changed are {} \n'.format(" , ".join(data.columns.values)))
  except Exception as e:   
    logging.error(e)
  finally:
    return data

## Ensure consistency dimension and ensure all inconsistent values are replaced with consistent values

In [9]:
# Consistency Dimension
def replace_inconsistent_values(data):
  try : 
    
    data['Liquid_Ignition']=data['Liquid_Ignition'].replace(to_replace="0",value='NO')
    data['Liquid_Ignition']=data['Liquid_Ignition'].replace(to_replace="1",value='YES')
    
    data['Liquid_Explosion']=data['Liquid_Explosion'].replace(to_replace="1",value='YES')
    data['Liquid_Explosion']=data['Liquid_Explosion'].replace(to_replace="0",value='NO')
    
    data['Pipeline_Type']=data['Pipeline_Type'].replace(to_replace="ABOVEGROUND TYPE",value='ABOVEGROUND')
    data['Pipeline_Type']=data['Pipeline_Type'].replace(to_replace="UNDERGROUND TYPE",value='UNDERGROUND')
    
    data['Pipeline_Location']=data['Pipeline_Location'].replace(to_replace="ONSHORE LOCATION",value='ONSHORE')
    data['Pipeline_Location']=data['Pipeline_Location'].replace(to_replace="OFFSHORE LOCATION",value='OFFSHORE')
    
    
    logging.info(' Step 7 - Inconsistent values in columns changed successfully \n')   
    logging.info('Various values in  column Liquid_Ignition now are {} \n'.format(" , ".join(list(data['Liquid_Ignition'].unique())))) 
    logging.info('Various values in  column Liquid_Explosion now are {} \n'.format(" , ".join(list(data['Liquid_Explosion'].unique()))))
    logging.info('Various values in  column Pipeline_Type now are {} \n'.format(" , ".join([ x for x in data.Pipeline_Type.unique() if x is not None ])))
    logging.info('Various values in  column Pipeline_Location now are {} \n'.format(" , ".join(data.Pipeline_Location.unique())))
  except Exception as e:   
    logging.error(e)
  finally:
    return data
    
    

## Changing Data types of columns

In [10]:
def changing_data_types(data):
  try:  
    data['Accident_Date/Time']= pd.to_datetime(data['Accident_Date/Time'].astype(str))
    convert_type={"Report_Number":int,"Supplemental_Number":int,"Pipeline_Shutdown":bool,"Operator_ID":int}
    data=data.astype(convert_type)
    
    logging.info(' Step 8 - Data types of columns changed successfully \n')
  except Exception as e:   
    logging.error(e)
  finally:
    return data   


## Trimming white spaces before and after columns values to ensure consistency in values

In [11]:
# to remove white spaces before and after any value in a column
def remove_white_spaces(data):
  try:  
    cols = data.select_dtypes(['object']).columns
    data[cols] = data[cols].apply(lambda x: x.str.strip())
    
    logging.info(' Step 9- White space before and after column values changed successfully \n')
  except Exception as e:   
    logging.error(e)
  finally:
    return data

## Code to create star schema  ( 1 fact table , 7 Dimension table )

In [12]:
def create_schema(data):
    
    # Accident Dimension Table
    accident_details = data[['Accident_Year', 'Accident_Date/Time','Accident_County', 'Report_Number','Accident_City','Accident_State','Accident_Latitude', 'Accident_Longitude']]
    
    
    # Cost dimension Table
    cost_table = data[['Property_Damage_Costs','Lost_Commodity_Costs',
                     'Public/Private_Property_Damage_Costs',
                     'Emergency_Response_Costs', 'Environmental_Remediation_Costs',
                     'Other_Costs', 'All_Costs', 'Report_Number']]
    
    # Fatalities dimension Table
    fatalities_table = data[['Operator_Contractor_Fatalities', 'Other_Fatalities',
                            'Public_Fatalities', 'All_Fatalities','Report_Number']]
    
    # Injuries Dimesion Table
    injuries_table = data[['Operator_Employee_Injuries',
                         'Operator_Contractor_Injuries', 'Public_Injuries', 
                         'All_Injuries','Report_Number']]
    
    # pipeline Dimension Table
    pipeline_table = data[['Pipeline/Facility_Name',
                         'Pipeline_Location', 'Pipeline_Type', 'Liquid_Type','Liquid_Ignition','Liquid_Explosion','Report_Number','Liquid_Subtype','Liquid_Name','Pipeline_Shutdown', 'Shutdown_Date/Time', 'Restart_Date/Time', 'Cause_Category','Cause_Subcategory']]
    
    
    # Barrels Dimesion Table
    barrels_table = data[['Unintentional_Release_(Barrels)',
                       'Intentional_Release_(Barrels)', 'Liquid_Recovery_(Barrels)',
                       'Net_Loss_(Barrels)','Report_Number']]
    
    #Operator Dimension Table
    operator_table = data[['Operator_ID', 'Operator_Name']]
    
    # Facts Table
    fact_table = data[['Report_Number','Supplemental_Number','Operator_ID','Public_Evacuations']]
      
    logging.info(' Step 10 - Snowflake schema created successfully ')
    
    return (accident_details,cost_table,fatalities_table,injuries_table,pipeline_table,barrels_table,operator_table,fact_table)
    

## Code to save our star schema (dataframes) into MYSQL database using df.to_sql function

In [13]:
# converting pandas dataframe to SQL tables using sqlalchemy library module create_engine
def dataframe_to_sql(accident_details,cost_table,fatalities_table,injuries_table,pipeline_table,barrels_table,operator_table,fact_table):  
    engine = create_engine(f"mysql://{user}:{password}@{host}/accident")
    with engine.begin() as connection:
    
      accident_details.to_sql(name="accident_detail", con=connection, if_exists = 'replace', index=False)
      cost_table.to_sql(name="accident_costs_table", con=connection, if_exists = 'replace', index=False)
      fatalities_table.to_sql(name="fatalities", con=connection, if_exists = 'replace', index=False)
      injuries_table.to_sql(name="injuries_table", con=connection, if_exists = 'replace', index=False) 
      pipeline_table.to_sql(name="pipeline_table", con=connection, if_exists = 'replace', index=False)
      barrels_table.to_sql(name="barrel_table", con=connection, if_exists = 'replace', index=False)
      operator_table.to_sql(name="operator_table", con=connection, if_exists = 'replace', index=False)
      fact_table.to_sql(name="fact_table", con=connection, if_exists = 'replace', index=False) 
      logging.info("Loading of staging files and star schema tables has been done in MySQL database succesfully \n")
        
        

## Calling the main function here

In [14]:
def main():
    
   data=load_data(schema,table)  

   create_profile_report(data) 
 
   data=remove_constant_value_columns(data)
   
   data=removing_duplicated_rows(data)
    
   data=removing_duplicated_columns(data)

   data=columns_name_change(data)     

   data=replace_inconsistent_values(data)

   data=changing_data_types(data)

   data=remove_white_spaces(data)
  
   accident_details,cost_table,fatalities_table,injuries_table,pipeline_table,barrels_table,operator_table,fact_table=create_schema(data)
    
   connection=create_conenction(host,user,password)  # creating connection to MYSQL to save stageing files ( star flake schema file)
   
   cursor=create_cursor(connection) 

   create_db(cursor,connection)
    
   dataframe_to_sql(accident_details,cost_table,fatalities_table,injuries_table,pipeline_table,barrels_table,operator_table,fact_table )
main()    

HBox(children=(HTML(value='Summarize dataset'), FloatProgress(value=0.0, max=57.0), HTML(value='')))




HBox(children=(HTML(value='Generate report structure'), FloatProgress(value=0.0, max=1.0), HTML(value='')))


Root
{'body': Container(name=Root), 'footer': HTML, 'name': 'Root'}


HBox(children=(HTML(value='Render HTML'), FloatProgress(value=0.0, max=1.0), HTML(value='')))




HBox(children=(HTML(value='Export report to file'), FloatProgress(value=0.0, max=1.0), HTML(value='')))


Data has 2801 rows and 49 columns 

connection is successful <mysql.connector.connection.MySQLConnection object at 0x12737f2b0>
Succesfully connected to Data Warehouse MYSQL
