## Importing Neccessary Libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, col
from sqlalchemy import create_engine
import pandas as pd

In [None]:
# Initialize Spark Session
spark = SparkSession.builder.appName('kumbaConstructionETL').getOrCreate()
#spark = SparkSession.builder.master("local[*]").appName("MyApp").getOrCreate()

spark

In [None]:
# Read data to spark
kumba_construction_df = spark.read.option("multiLine", "true").option("delimiter", ",").csv(r'dataset\kumba_construction_data.csv', header=True, inferSchema=True)

In [None]:
# Show the first 100 rows in kumba_construction_df
kumba_construction_df.show(100)

In [None]:
# Show the schema of the data
kumba_construction_df.printSchema()

In [None]:
#Show columns
kumba_construction_df.columns

In [None]:
#No of rows
num_rows = kumba_construction_df.count()

num_rows

In [None]:
# No of columns
num_columns = len(kumba_construction_df.columns)

num_columns

In [None]:
# Checking for Null values
for column in kumba_construction_df.columns:
    print(column, 'Nulls', kumba_construction_df.filter(kumba_construction_df[column].isNull()).count())

In [None]:
# Fill up missing values
kumba_construction_df_clean = kumba_construction_df.fillna({
    'MaterialsCost' : 0.0,
    'LaborCost' : 0.0,
    'EquipmentCost' : 0.0,
    'PermitFees' : 0.0,
    'InspectionFees' : 0.0,
    'ChangeOrderCount' : 0.0,
    'SafetyIncidentsCount' : 0.0,
    'ProjectDelayDays' : 0.0,
    'WeatherDelayDays' : 0.0,
    'ClientFeedbackScore' : 0.0,
    'QualityAuditScore' : 0.0,
    'EnvironmentalImpactScore' : 0.0,
    'EnergyEfficiencyScore' : 0.0,
    'InnovationScore' : 0.0,
    'CommunityImpactScore' : 0.0,
    'ROI' : 0.0
})

In [None]:
# Checking for Null values
for column in kumba_construction_df_clean.columns:
    print(column, 'Nulls', kumba_construction_df_clean.filter(kumba_construction_df_clean[column].isNull()).count())

In [None]:
kumba_construction_df_clean.describe().show()

In [None]:
kumba_construction_df_clean.columns

In [None]:
#Client Table
client = kumba_construction_df_clean.select('ClientName')

client = client.withColumn('client_id', monotonically_increasing_id())

client = client.select('client_id', 'ClientName')

In [None]:
client.show()

In [None]:
# Contractor table
contractor = kumba_construction_df_clean.select('ContractorName', 'NumberOfSubcontractors').distinct()

contractor = contractor.withColumn('contractor_id', monotonically_increasing_id())

contractor = contractor.select('contractor_id', 'ContractorName', 'NumberOfSubcontractors')

In [None]:
contractor.show()

In [None]:
# Manager table
manager = kumba_construction_df_clean.select('ManagerName').distinct()

manager = manager.withColumn('manager_id', monotonically_increasing_id())

manager = manager.select('manager_id','ManagerName')

In [None]:
manager.show()

In [None]:
# ProjectType table
projecttype = kumba_construction_df_clean.select('ProjectType').distinct()

projecttype = projecttype.withColumn('projecttype_id', monotonically_increasing_id())

projecttype = projecttype.select('projecttype_id', 'ProjectType')

In [None]:
projecttype.show()

In [None]:
# Location Table
location = kumba_construction_df_clean.select('Location').distinct()

location = location.withColumn('location_id', monotonically_increasing_id())

location = location.select('location_id', 'Location')

In [None]:
location.show()

In [None]:
# Fact Table
fact_table = kumba_construction_df_clean.join(client, ['ClientName'], 'left') \
                                        .join(projecttype, ['ProjectType'], 'left') \
                                        .join(contractor, ['ContractorName','NumberOfSubcontractors'], 'left') \
                                        .join(location, ['Location'], 'left') \
                                        .join(manager, ['ManagerName'], 'left') \
                                        .select('ProjectID','ProjectName','ProjectStatus','projecttype_id','manager_id','contractor_id','client_id','location_id','StartDate','EndDate','EstimatedBudget','ActualCost','TeamSize','MaterialsCost','LaborCost','EquipmentCost','PermitFees','InspectionFees','ChangeOrderCount','SafetyIncidentsCount','ProjectDelayDays','WeatherDelayDays','ClientFeedbackScore','QualityAuditScore','EnvironmentalImpactScore','EnergyEfficiencyScore','InnovationScore','CommunityImpactScore','ROI')

In [None]:
fact_table.show(100)

In [None]:
kumba_construction_df_clean.columns

In [None]:
# Output the transformed data to parquet

client.write.mode('overwrite').parquet(r'dataset/client')
contractor.write.mode('overwrite').parquet(r'dataset/contractor')
location.write.mode('overwrite').parquet(r'dataset/location')
manager.write.mode('overwrite').parquet(r'dataset/manager')
projecttype.write.mode('overwrite').parquet(r'dataset/projecttype')
fact_table.write.mode('overwrite').parquet(r'dataset/fact_table')

In [None]:
# Convert spark df to pandas df
client_pd_df = client.toPandas()
contractor_pd_df = contractor.toPandas()
manager_pd_df = manager.toPandas()
projecttype_pd_df = projecttype.toPandas()
location_pd_df = location.toPandas()
fact_table_pd_df = fact_table.toPandas()

In [None]:
# Loading the dataset into a postgresql DB

# define database connection parameters
db_params = {
    'username' : 'postgres',
    'password' : 'Nonsosky%401',
    'host' : 'localhost',
    'port' : '5432',
    'database' : 'kumba_construction'
}

# define the database connection url with db parameters
db_url = f"postgresql://{db_params['username']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}"

# Create the database engine with the db url
engine = create_engine(db_url)

# Connect to PostgreSQL server
with engine.connect() as connection:
    # Create tables and load the data
    client_pd_df.to_sql('client', connection, index=False, if_exists='replace')
    contractor_pd_df.to_sql('contractor', connection, index=False, if_exists='replace')
    manager_pd_df.to_sql('manager', connection, index=False, if_exists='replace')
    projecttype_pd_df.to_sql('projecttype', connection, index=False, if_exists='replace')
    location_pd_df.to_sql('location', connection, index=False, if_exists='replace')
    fact_table_pd_df.to_sql('fact_table', connection, index=False, if_exists='replace')

print('Database, tables and data loaded successfully')