In [89]:
# Importing the necessary packages
!pip install pyarrow
!pip install azure-storage-blob





[notice] A new release of pip is available: 23.2.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 23.2.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [90]:
# import necessary libraries
import pandas as pd
import os 
import io
from azure.storage.blob import BlobServiceClient, BlobClient


In [91]:
# Extraction of raw data files
agency_df = pd.read_csv(r'nyc_payroll_capstone_project\raw_files\AgencyMaster.csv')
employemnt_df = pd.read_csv(r'nyc_payroll_capstone_project\raw_files\EmpMaster.csv')
payroll_2020_df = pd.read_csv(r'nyc_payroll_capstone_project\raw_files\nycpayroll_2020.csv')
payroll_2021_df = pd.read_csv(r'nyc_payroll_capstone_project\raw_files\nycpayroll_2021.csv')
title_df = pd.read_csv(r'nyc_payroll_capstone_project\raw_files\TitleMaster.csv')

In [92]:
# cleaning the data
title_df.fillna({
    'TitleDescription' : 'unknown'
}, inplace=True)

In [93]:
# creating the tables
agency_dim = agency_df[['AgencyID', 'AgencyName']].copy().drop_duplicates().reset_index(drop=True)

agency_dim.head()

Unnamed: 0,AgencyID,AgencyName
0,2001,ADMIN FOR CHILDREN'S SVCS
1,2002,ADMIN TRIALS AND HEARINGS
2,2003,BOARD OF CORRECTION
3,2004,BOARD OF ELECTION
4,2005,BOARD OF ELECTION POLL WORKERS


In [94]:
# employee table
employee_dim = employemnt_df[['EmployeeID', 'LastName', 'FirstName']].copy().drop_duplicates().reset_index(drop=True)

In [95]:
# title table
title_dim = title_df[['TitleCode', 'TitleDescription']].copy().drop_duplicates().reset_index(drop=True)

In [96]:
# Concatenate the payroll data for 2020 and 2021
payroll_combined = pd.concat([payroll_2020_df, payroll_2021_df], ignore_index=True)

# Display the combined DataFrame to confirm
payroll_combined.head()


Unnamed: 0,FiscalYear,PayrollNumber,AgencyID,AgencyName,EmployeeID,LastName,FirstName,AgencyStartDate,WorkLocationBorough,TitleCode,TitleDescription,LeaveStatusasofJune30,BaseSalary,PayBasis,RegularHours,RegularGrossPaid,OTHours,TotalOTPaid,TotalOtherPay,AgencyCode
0,2020,17,2120.0,OFFICE OF EMERGENCY MANAGEMENT,100001,GEAGER,VERONICA,09-12-16,BROOKLYN,40447,EMERGENCY PREPAREDNESS MANAGER,ACTIVE,86005.0,per Annum,1820.0,84698.21,0.0,0.0,0.0,
1,2020,17,2120.0,OFFICE OF EMERGENCY MANAGEMENT,149612,ROTTA,JONATHAN,9/16/2013,BROOKLYN,40447,EMERGENCY PREPAREDNESS MANAGER,ACTIVE,86005.0,per Annum,1820.0,84698.21,0.0,0.0,0.0,
2,2020,17,2120.0,OFFICE OF EMERGENCY MANAGEMENT,206583,WILSON II,ROBERT,4/30/2018,BROOKLYN,40447,EMERGENCY PREPAREDNESS MANAGER,ACTIVE,86005.0,per Annum,1820.0,84698.21,0.0,0.0,0.0,
3,2020,17,2120.0,OFFICE OF EMERGENCY MANAGEMENT,199874,WASHINGTON,MORIAH,3/18/2019,BROOKLYN,40447,EMERGENCY PREPAREDNESS MANAGER,ACTIVE,86005.0,per Annum,1820.0,87900.95,0.0,0.0,-3202.74,
4,2020,17,2120.0,OFFICE OF EMERGENCY MANAGEMENT,58036,KRAWCZYK,AMANDA,5/15/2017,BROOKLYN,40447,EMERGENCY PREPAREDNESS MANAGER,ACTIVE,86005.0,per Annum,1820.0,83976.54,0.0,0.0,0.0,


In [97]:
payroll_combined.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 201 entries, 0 to 200
Data columns (total 20 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   FiscalYear             201 non-null    int64  
 1   PayrollNumber          201 non-null    int64  
 2   AgencyID               100 non-null    float64
 3   AgencyName             201 non-null    object 
 4   EmployeeID             201 non-null    int64  
 5   LastName               201 non-null    object 
 6   FirstName              201 non-null    object 
 7   AgencyStartDate        201 non-null    object 
 8   WorkLocationBorough    201 non-null    object 
 9   TitleCode              201 non-null    int64  
 10  TitleDescription       201 non-null    object 
 11  LeaveStatusasofJune30  201 non-null    object 
 12  BaseSalary             201 non-null    float64
 13  PayBasis               201 non-null    object 
 14  RegularHours           201 non-null    float64
 15  Regula

In [98]:
# filled payroll_combined with missing values
payroll_combined.fillna({
    'AgencyID' : 0,
    'AgencyCode': 0
}, inplace=True)


In [99]:
# changing datatypes to integer
payroll_combined['AgencyID'] = payroll_combined['AgencyID'].astype(int)
payroll_combined['AgencyCode'] = payroll_combined['AgencyCode'].astype(int)

In [100]:
# Assuming EmployeeID is the column name and needs to be converted to 6 digits by adding leading zeros

payroll_combined['EmployeeID'] = payroll_combined['EmployeeID'].apply(lambda x: int(str(x).zfill(6)))

# Save the updated DataFrame back to the CSV file
#payroll_combined.to_csv(payroll_combined_path, index=False)

print("EmployeeID values in payroll_combined.csv have been updated successfully.")


EmployeeID values in payroll_combined.csv have been updated successfully.


In [101]:
payroll_facts_table = payroll_combined.merge(employee_dim, on= ['EmployeeID', 'LastName', 'FirstName'], how = 'left') \
                                     .merge(title_dim, on = ['TitleCode', 'TitleDescription'], how = 'left') \
                                     .merge(agency_dim, on = ['AgencyID', 'AgencyName'], how = 'left') \
                                     [['PayrollNumber', 'EmployeeID', 'AgencyID','TitleCode', 'FiscalYear', 'AgencyStartDate', 'WorkLocationBorough', 'AgencyCode', \
                                        'LeaveStatusasofJune30', 'BaseSalary','PayBasis', 'RegularHours', 'RegularGrossPaid', 'OTHours', 'TotalOTPaid', 'TotalOtherPay' ]]

In [102]:
# loading files into csv
employee_dim.to_csv(r'nyc_payroll_capstone_project\dataset\employee_dim.csv', index=False)
title_dim.to_csv(r'nyc_payroll_capstone_project\dataset\title_dim.csv', index=False)
agency_dim.to_csv(r'nyc_payroll_capstone_project\dataset\agency_dim.csv', index=False)
payroll_facts_table.to_csv(r'nyc_payroll_capstone_project\dataset\payroll_facts_table.csv', index=False)

In [103]:
# setting the Azure blob connection and load data
connect_str = 'DefaultEndpointsProtocol=https;AccountName=nycpayrollstorageacc;AccountKey=zHgC78onYqdSlJtkZAbWwzE192tQ5buJlJDshBbDgZaDNzytP3UMcNdL752x/oijdmjjTBCiQ8eB+AStk8qpfg==;EndpointSuffix=core.windows.net'
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

container_name = 'nycpayrollcontainer'
container_client = blob_service_client.get_container_client(container_name)

In [104]:
# loading data into Azure blob storage as a parquet file
def upload_df_to_blob_as_parquet(df, container_client,blob_name):
    buffer = io.BytesIO()
    df.to_parquet(buffer, index=False)
    buffer.seek(0)
    blob_client = container_client.get_blob_client(blob_name)
    blob_client.upload_blob(buffer, blob_type="BlockBlob", overwrite=True)
    print(f'{blob_name} uploaded to Blob storage successfully')

In [105]:
upload_df_to_blob_as_parquet(agency_dim, container_client, 'rawdata/agency_dim.parquet')
upload_df_to_blob_as_parquet(employee_dim, container_client, 'rawdata/employee_dim.parquet')
upload_df_to_blob_as_parquet(payroll_facts_table, container_client, 'rawdata/payroll_facts_table.parquet')
upload_df_to_blob_as_parquet(title_dim, container_client, 'rawdata/title_dim.parquet')


rawdata/agency_dim.parquet uploaded to Blob storage successfully
rawdata/employee_dim.parquet uploaded to Blob storage successfully
rawdata/payroll_facts_table.parquet uploaded to Blob storage successfully
rawdata/title_dim.parquet uploaded to Blob storage successfully


#### Loading data into RDBMS using psycopg2

In [106]:
!pip install psycopg2




[notice] A new release of pip is available: 23.2.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [107]:
# import necessary package
import psycopg2

# Loading data to postgresql
def get_db_connection():
    conn = psycopg2.connect(
        host = 'localhost',
        database = 'nyc_payroll',
        user = 'postgres',
        password = 'genesis23NeW'   
    )
    return conn


In [108]:
# Create a function to create tables
def create_table():
    conn = get_db_connection()
    cursor = conn.cursor()
    create_table_query = '''
                         -- Drop tables if they exist
                         DROP TABLE IF EXISTS payroll_facts_table;
                         DROP TABLE IF EXISTS agency_dim;
                         DROP TABLE IF EXISTS employee_dim;
                         DROP TABLE IF EXISTS title_dim;

                         -- Create agency_dim table
                         CREATE TABLE IF NOT EXISTS agency_dim (
                             AgencyID INT PRIMARY KEY,
                             AgencyName VARCHAR(255)  -- Using a more reasonable length
                         );

                         -- Create employee_dim table
                         CREATE TABLE IF NOT EXISTS employee_dim (
                             EmployeeID INT PRIMARY KEY,
                             LastName VARCHAR(255),
                             FirstName VARCHAR(255)
                         );

                         -- Create title_dim table
                         CREATE TABLE IF NOT EXISTS title_dim (
                             TitleCode INT PRIMARY KEY,
                             TitleDescription VARCHAR(255)
                         );

                         CREATE TABLE IF NOT EXISTS payroll_facts_table(
                             PayrollNumber INT PRIMARY KEY,
                             EmployeeID INT,
                             AgencyID INT,
                             TitleCode INT,
                             FiscalYear INT,
                             AgencyStartDate VARCHAR(100),
                             WorkLocationBorough VARCHAR(255),
                             AgencyCode INT,
                             LeaveStatusasofJune30 VARCHAR(255),
                             BaseSalary FLOAT,
                             PayBasis VARCHAR(255),
                             RegularHours FLOAT,
                             RegularGrossPaid FLOAT,
                             OTHours FLOAT,
                             TotalOTPaid FLOAT,
                             TotalOtherPay FLOAT,
                             FOREIGN KEY (EmployeeID) REFERENCES employee_dim(EmployeeID),
                             FOREIGN KEY (AgencyID) REFERENCES agency_dim (AgencyID),
                             FOREIGN KEY (TitleCode) REFERENCES title_dim(TitleCode)
                         );  
                         '''
    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()
    conn.close()



In [109]:
create_table()

In [110]:
# Loading the agency_dim.csv data
import csv
def load_data_from_csv(csv_path):
    conn = get_db_connection()
    cursor = conn.cursor()
    with open(csv_path, 'r') as file:
        reader = csv.reader(file)
        next(reader)
        for row in reader:
            cursor.execute(
                '''INSERT INTO agency_dim(AgencyID, AgencyName)
                   VALUES (%s, %s);''',
                   row
            )
    conn.commit()
    cursor.close()
    conn.close()
    
# csv file path to the file
csv_file_path = r'nyc_payroll_capstone_project\dataset\agency_dim.csv'

load_data_from_csv(csv_file_path)
print('agency_dim data loaded successfully')


agency_dim data loaded successfully


In [111]:
# loading employee_dim.csv data
def load_data_from_csv(csv_path):
    conn = get_db_connection()
    cursor = conn.cursor()
    with open(csv_path, 'r') as file:
        reader = csv.reader(file)
        next(reader)
        for row in reader:
            cursor.execute(
                '''INSERT INTO employee_dim(EmployeeID, LastName, FirstName)
                   VALUES (%s, %s, %s);''',
                   row
            )
    conn.commit()
    cursor.close()
    conn.close()
    
# csv file path to the file
csv_file_path = r'nyc_payroll_capstone_project\dataset\employee_dim.csv'

load_data_from_csv(csv_file_path)
print('employee_dim data loaded successfully')

employee_dim data loaded successfully


In [112]:
# loading employee_dim.csv data
def load_data_from_csv(csv_path):
    conn = get_db_connection()
    cursor = conn.cursor()
    with open(csv_path, 'r') as file:
        reader = csv.reader(file)
        next(reader)
        for row in reader:
            cursor.execute(
                '''INSERT INTO title_dim(TitleCode, TitleDescription)
                   VALUES (%s, %s);''',
                   row
            )
    conn.commit()
    cursor.close()
    conn.close()
    
# csv file path to the file
csv_file_path = r'nyc_payroll_capstone_project\dataset\title_dim.csv'

load_data_from_csv(csv_file_path)
print('title_dim data loaded successfully')

title_dim data loaded successfully


In [None]:
# loading payroll_facts_table.csv data
def load_data_from_csv(csv_path):
    conn = get_db_connection()
    cursor = conn.cursor()
    with open(csv_path, 'r') as file:
        reader = csv.reader(file)
        next(reader)
        for row in reader:
            try:
                row[0] = int(float(row[0]))  # PayrollNumber
                row[1] = int(float(row[1]))  # EmployeeID
                row[2] = int(float(row[2]))  # AgencyID
                row[3] = int(float(row[3]))  # TitleCode
                row[4] = int(float(row[4]))  # FiscalYear
            except ValueError as e:
                print(f"Error converting row: {row}")
                print(e)
                continue 
            
            
            cursor.execute(
                '''INSERT INTO payroll_facts_table(PayrollNumber, EmployeeID, AgencyID, TitleCode, FiscalYear, \
                    AgencyStartDate, WorkLocationBorough, AgencyCode, LeaveStatusasofJune30, BaseSalary, \
                        PayBasis, RegularHours, RegularGrossPaid, OTHours, TotalOTPaid, TotalOtherPay)
                   VALUES (%s, %s, %s, %s, %s, %s,%s, %s, %s,%s, %s, %s,%s, %s, %s, %s)
                   ON CONFLICT (PayrollNumber) DO NOTHING;''',
                   tuple(row)
            )
    conn.commit()
    cursor.close()
    conn.close()
    
# csv file path to the file
csv_file_path = r'nyc_payroll_capstone_project\dataset\payroll_facts_table.csv'

load_data_from_csv(csv_file_path)
print('payroll_facts_table data loaded successfully')

In [63]:
payroll_facts_table.columns

Index(['PayrollNumber', 'EmployeeID', 'AgencyID', 'TitleCode', 'FiscalYear',
       'AgencyStartDate', 'WorkLocationBorough', 'AgencyCode',
       'LeaveStatusasofJune30', 'BaseSalary', 'PayBasis', 'RegularHours',
       'RegularGrossPaid', 'OTHours', 'TotalOTPaid', 'TotalOtherPay'],
      dtype='object')