In [None]:
#Import Necessary Libraries
import pandas as pd
import os
import glob
from dotenv import load_dotenv
import psycopg2

#### Data Extraction

In [35]:
#Load Master Data
employee_df = pd.read_csv('dataset/raw/EmpMaster.csv')
agency_df = pd.read_csv('dataset/raw/AgencyMaster.csv')
jobtitle_df = pd.read_csv('dataset/raw/TitleMaster.csv')

In [38]:
employee_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   EmployeeID  1000 non-null   int64 
 1   LastName    1000 non-null   object
 2   FirstName   1000 non-null   object
dtypes: int64(1), object(2)
memory usage: 23.6+ KB


In [28]:
# Define directory containing payroll CSV files
payroll_dir = "dataset/payroll_data"

# Find all payroll CSV files dynamically
payroll_files = glob.glob(os.path.join(payroll_dir, "nycpayroll_*.csv"))

# Check if any files were found
if not payroll_files:
    raise ValueError("No payroll files found in the directory!")

# Function to Load and Merge Payroll Data
def load_payroll_data(files):
    dataframes = [pd.read_csv(file) for file in files]  # Read all CSV files into Pandas DataFrames
    merged_df = pd.concat(dataframes, ignore_index=True)  # Concatenate all dataframes
    merged_df.drop_duplicates(subset=["EmployeeID", "FiscalYear"], inplace=True)  # Drop duplicate rows
    return merged_df

# Load and process payroll data
payroll_df = load_payroll_data(payroll_files)

# Show first few rows for validation
# display(payroll_df.show(205))
#print(payroll_df.info())  # Display DataFrame structure

In [29]:
display(payroll_df)

Unnamed: 0,FiscalYear,PayrollNumber,AgencyID,AgencyName,EmployeeID,LastName,FirstName,AgencyStartDate,WorkLocationBorough,TitleCode,TitleDescription,LeaveStatusasofJune30,BaseSalary,PayBasis,RegularHours,RegularGrossPaid,OTHours,TotalOTPaid,TotalOtherPay
0,2020,17,2120,OFFICE OF EMERGENCY MANAGEMENT,10001,GEAGER,VERONICA,9/12/2016,BROOKLYN,40447,EMERGENCY PREPAREDNESS MANAGER,ACTIVE,86005.0,per Annum,1820.0,84698.21,0.00,0.00,0.00
1,2020,17,2120,OFFICE OF EMERGENCY MANAGEMENT,149612,ROTTA,JONATHAN,9/16/2013,BROOKLYN,40447,EMERGENCY PREPAREDNESS MANAGER,ACTIVE,86005.0,per Annum,1820.0,84698.21,0.00,0.00,0.00
2,2020,17,2120,OFFICE OF EMERGENCY MANAGEMENT,206583,WILSON II,ROBERT,4/30/2018,BROOKLYN,40447,EMERGENCY PREPAREDNESS MANAGER,ACTIVE,86005.0,per Annum,1820.0,84698.21,0.00,0.00,0.00
3,2020,17,2120,OFFICE OF EMERGENCY MANAGEMENT,199874,WASHINGTON,MORIAH,3/18/2019,BROOKLYN,40447,EMERGENCY PREPAREDNESS MANAGER,ACTIVE,86005.0,per Annum,1820.0,87900.95,0.00,0.00,-3202.74
4,2020,17,2120,OFFICE OF EMERGENCY MANAGEMENT,58036,KRAWCZYK,AMANDA,5/15/2017,BROOKLYN,40447,EMERGENCY PREPAREDNESS MANAGER,ACTIVE,86005.0,per Annum,1820.0,83976.54,0.00,0.00,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
196,2021,868,2141,DEPT OF CITYWIDE ADMIN SVCS,52830,HARDING,ARCHIBALD,5/31/2016,MANHATTAN,40782,STATIONARY ENGINEER,ACTIVE,508.8,per Day,2080.0,132288.00,901.25,90004.07,36369.10
197,2021,57,2012,FIRE DEPARTMENT,174178,MCGEARY,JOSEPH,8/19/1990,BROOKLYN,40207,CAPTAIN,ACTIVE,135511.0,per Annum,2080.0,146527.13,833.63,77912.32,34206.86
198,2021,56,2010,POLICE DEPARTMENT,302174,CHENG,CHRISTOPHER,6/30/1998,MANHATTAN,40579,LIEUTENANT D/A SPECIAL ASSIGNMENT,ACTIVE,149068.0,per Annum,2080.0,147066.63,950.08,90551.25,20495.26
199,2021,56,2010,POLICE DEPARTMENT,229552,ORTEGA,MANUEL,2/12/2018,MANHATTAN,40782,STATIONARY ENGINEER,ACTIVE,508.8,per Day,2080.0,131779.20,1085.25,102184.53,24136.67


#### Data Transformation

In [None]:
# Check for Null Values 
for column in payroll_df.columns:
    null_count = payroll_df[column].isna().sum()
    print(f"{column}: {null_count} null values")

FiscalYear: 0 null values
PayrollNumber: 0 null values
AgencyID: 0 null values
AgencyName: 0 null values
EmployeeID: 0 null values
LastName: 0 null values
FirstName: 0 null values
AgencyStartDate: 0 null values
WorkLocationBorough: 0 null values
TitleCode: 0 null values
TitleDescription: 0 null values
LeaveStatusasofJune30: 0 null values
BaseSalary: 0 null values
PayBasis: 0 null values
RegularHours: 0 null values
RegularGrossPaid: 0 null values
OTHours: 0 null values
TotalOTPaid: 0 null values
TotalOtherPay: 0 null values


In [32]:
# Fill null values with defaults
for col_name in payroll_df.columns:
    if payroll_df[col_name].dtype == "object":  # Strings
        payroll_df[col_name].fillna("Unknown", inplace=True)
    elif payroll_df[col_name].dtype in ["float64", "float32"]:  # Float types
        payroll_df[col_name].fillna(0.0, inplace=True)
    elif payroll_df[col_name].dtype in ["int64", "int32"]:  # Integer types
        payroll_df[col_name].fillna(0, inplace=True)

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  payroll_df[col_name].fillna(0, inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  payroll_df[col_name].fillna("Unknown", inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting va

In [34]:
employee_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   EmployeeID  1000 non-null   object
 1   LastName    1000 non-null   object
 2   FirstName   1000 non-null   object
dtypes: object(3)
memory usage: 23.6+ KB


In [39]:
# Merge all data together
merged_data = payroll_df \
    .merge(employee_df, left_on=["EmployeeID", "LastName", "FirstName"], right_on=["EmployeeID", "LastName", "FirstName"], how="left") \
    .merge(agency_df, left_on=["AgencyID", "AgencyName"], right_on=["AgencyID", "AgencyName"], how="left") \
    .merge(jobtitle_df, left_on=["TitleCode", "TitleDescription"], right_on=["TitleCode", "TitleDescription"], how="left")

In [42]:
merged_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 201 entries, 0 to 200
Data columns (total 19 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   FiscalYear             201 non-null    int64  
 1   PayrollNumber          201 non-null    int64  
 2   AgencyID               201 non-null    int64  
 3   AgencyName             201 non-null    object 
 4   EmployeeID             201 non-null    int64  
 5   LastName               201 non-null    object 
 6   FirstName              201 non-null    object 
 7   AgencyStartDate        201 non-null    object 
 8   WorkLocationBorough    201 non-null    object 
 9   TitleCode              201 non-null    int64  
 10  TitleDescription       201 non-null    object 
 11  LeaveStatusasofJune30  201 non-null    object 
 12  BaseSalary             201 non-null    float64
 13  PayBasis               201 non-null    object 
 14  RegularHours           201 non-null    float64
 15  Regula

In [44]:
# Convert to datetime format
merged_data["AgencyStartDate"] = pd.to_datetime(merged_data["AgencyStartDate"], format="%m/%d/%Y", errors="coerce")

In [46]:
merged_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 201 entries, 0 to 200
Data columns (total 19 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   FiscalYear             201 non-null    int64         
 1   PayrollNumber          201 non-null    int64         
 2   AgencyID               201 non-null    int64         
 3   AgencyName             201 non-null    object        
 4   EmployeeID             201 non-null    int64         
 5   LastName               201 non-null    object        
 6   FirstName              201 non-null    object        
 7   AgencyStartDate        201 non-null    datetime64[ns]
 8   WorkLocationBorough    201 non-null    object        
 9   TitleCode              201 non-null    int64         
 10  TitleDescription       201 non-null    object        
 11  LeaveStatusasofJune30  201 non-null    object        
 12  BaseSalary             201 non-null    float64       
 13  PayBa

In [None]:
# Create Employee Dimension Table
employee_dim = merged_data[['EmployeeID', 'LastName', 'FirstName', 'WorkLocationBorough', 'LeaveStatusasofJune30']].copy().drop_duplicates(['EmployeeID']).reset_index(drop=True)
display(employee_dim)



Unnamed: 0,EmployeeID,LastName,FirstName,WorkLocationBorough,LeaveStatusasofJune30
0,10001,GEAGER,VERONICA,BROOKLYN,ACTIVE
1,149612,ROTTA,JONATHAN,BROOKLYN,ACTIVE
2,206583,WILSON II,ROBERT,BROOKLYN,ACTIVE
3,199874,WASHINGTON,MORIAH,BROOKLYN,ACTIVE
4,58036,KRAWCZYK,AMANDA,BROOKLYN,ACTIVE
...,...,...,...,...,...
195,164144,MASSEY-COVINGTO,DARLENE,QUEENS,ACTIVE
196,52830,HARDING,ARCHIBALD,MANHATTAN,ACTIVE
197,174178,MCGEARY,JOSEPH,BROOKLYN,ACTIVE
198,302174,CHENG,CHRISTOPHER,MANHATTAN,ACTIVE


In [52]:
#Create Agency Dimension Table
agency_dim = merged_data[['AgencyID', 'AgencyName']].copy().drop_duplicates().reset_index(drop=True)
display(agency_dim)

Unnamed: 0,AgencyID,AgencyName
0,2120,OFFICE OF EMERGENCY MANAGEMENT
1,2122,OFFICE OF MANAGEMENT & BUDGET
2,2153,NYC HOUSING AUTHORITY
3,2129,DEPT OF HEALTH/MENTAL HYGIENE
4,2092,GUTTMAN COMMUNITY COLLEGE
5,2017,DEPARTMENT OF CORRECTION
6,2131,DEPT OF ENVIRONMENT PROTECTION
7,2044,OFFICE OF THE ACTUARY
8,2132,DEPARTMENT OF SANITATION
9,2051,OFFICE OF THE COMPTROLLER


In [54]:
#Create Job_Title Dimension Table
jobtitle_dim = merged_data[['TitleCode', 'TitleDescription']].copy().drop_duplicates().reset_index(drop=True)
display(jobtitle_dim)

Unnamed: 0,TitleCode,TitleDescription
0,40447,EMERGENCY PREPAREDNESS MANAGER
1,40285,COMMISSIONER OF EMERGENCY MANAGEMENT
2,40448,EMERGENCY PREPAREDNESS SPECIALIST
3,40291,COMMUNITY ASSOCIATE
4,40362,DEPUTY COMMISSIONER
5,40088,AGENCY ATTORNEY
6,40494,FIRST DEPUTY COMMISSIONER
7,40199,BUDGET ANALYST
8,40409,DIRECTOR OF MANAGEMENT & BUDGET
9,40081,ADMINISTRATIVE STAFF ANALYST


In [55]:
#Create Time Dimension Table
time_dim = merged_data[['FiscalYear']].copy().drop_duplicates().reset_index(drop=True)
time_dim['TimeID'] = range(1, len(time_dim) + 1)
time_dim = time_dim[['TimeID', 'FiscalYear']]

display(time_dim)

Unnamed: 0,TimeID,FiscalYear
0,1,2020
1,2,1998
2,3,2021
3,4,1999


In [None]:
# Create Payroll Fact Table using pandas
payroll_fact_tbl = merged_data \
    .merge(employee_dim, on=['LastName', 'FirstName', 'LeaveStatusasofJune30', 'WorkLocationBorough'], how='left', suffixes=('', '_emp')) \
    .merge(agency_dim, on=['AgencyName'], how='left', suffixes=('', '_agency')) \
    .merge(jobtitle_dim, on=['TitleDescription'], how='left', suffixes=('', '_job')) \
    .merge(time_dim, on=['FiscalYear'], how='left', suffixes=('', '_time'))

# Select specific columns
payroll_fact_tbl['PayrollID'] = range(1, len(payroll_fact_tbl) + 1)
payroll_fact_tbl = payroll_fact_tbl[[
    'PayrollID', 'EmployeeID', 'AgencyID', 'TitleCode', 'TimeID',
    'PayrollNumber', 'BaseSalary', 'PayBasis', 'AgencyStartDate',
    'RegularHours', 'RegularGrossPaid', 'OTHours', 'TotalOTPaid', 'TotalOtherPay'
]]

# Verify the output
display(payroll_fact_tbl)


Unnamed: 0,PayrollID,EmployeeID,AgencyID,TitleCode,TimeID,PayrollNumber,BaseSalary,PayBasis,AgencyStartDate,RegularHours,RegularGrossPaid,OTHours,TotalOTPaid,TotalOtherPay
0,1,10001,2120,40447,1,17,86005.0,per Annum,2016-09-12,1820.0,84698.21,0.00,0.00,0.00
1,2,149612,2120,40447,1,17,86005.0,per Annum,2013-09-16,1820.0,84698.21,0.00,0.00,0.00
2,3,206583,2120,40447,1,17,86005.0,per Annum,2018-04-30,1820.0,84698.21,0.00,0.00,0.00
3,4,199874,2120,40447,1,17,86005.0,per Annum,2019-03-18,1820.0,87900.95,0.00,0.00,-3202.74
4,5,58036,2120,40447,1,17,86005.0,per Annum,2017-05-15,1820.0,83976.54,0.00,0.00,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
196,197,52830,2141,40782,3,868,508.8,per Day,2016-05-31,2080.0,132288.00,901.25,90004.07,36369.10
197,198,174178,2012,40207,3,57,135511.0,per Annum,1990-08-19,2080.0,146527.13,833.63,77912.32,34206.86
198,199,302174,2010,40579,3,56,149068.0,per Annum,1998-06-30,2080.0,147066.63,950.08,90551.25,20495.26
199,200,229552,2010,40782,3,56,508.8,per Day,2018-02-12,2080.0,131779.20,1085.25,102184.53,24136.67


In [59]:
# Save transformed data
employee_dim.to_csv('dataset/cleaned_data/employee.csv', index=False)
agency_dim.to_csv('dataset/cleaned_data/agency.csv', index=False)
jobtitle_dim.to_csv('dataset/cleaned_data/jobtitle.csv', index=False)
time_dim.to_csv('dataset/cleaned_data/time.csv', index=False)
payroll_fact_tbl.to_csv('dataset/cleaned_data/payroll_fact_tbl.csv', index=False)