In [None]:
# Import Necessary Libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
import os, glob
from dotenv import load_dotenv
import psycopg2
from sqlalchemy import create_engine

In [None]:
# Set Java Environment
os.environ['JAVA_HOME'] = "C:/java8"

In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("PayrollETL") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

#### Data Extraction

In [None]:
#Load Master Data
employee_df = spark.read.csv(r'dataset\raw\EmpMaster.csv', header=True, inferSchema=True)
agency_df = spark.read.csv(r'dataset\raw\AgencyMaster.csv', header=True, inferSchema=True)
jobtitle_df = spark.read.csv(r'dataset/raw/TitleMaster.csv', header=True, inferSchema=True)

In [None]:
#Dynamically Loading and Merging all Payroll data

# Define directory containing payroll CSV files
payroll_dir = r"dataset\payroll_data" 

# Find all payroll CSV files dynamically
payroll_files = glob.glob(os.path.join(payroll_dir, "nycpayroll_*.csv"))

# Check if any files were found
if not payroll_files:
    raise ValueError("No payroll files found in the directory!")

# Load and merge payroll data
def load_payroll_data(files):
    """Loads multiple payroll datasets dynamically and merges them."""
    dataframes = [spark.read.csv(file, header=True, inferSchema=True) for file in files]
    merged_df = dataframes[0]
    for df in dataframes[1:]:
        merged_df = merged_df.union(df)
    return merged_df.dropDuplicates(["EmployeeID", "FiscalYear"])

# Load and process payroll data
payroll_df = load_payroll_data(payroll_files)

# Show the schema and first few rows for validation
payroll_df.printSchema()
payroll_df.show()

#### Data Transformation

In [None]:
# Convert AgencyStartDate to Date datatype
payroll_df = payroll_df.withColumn("AgencyStartDate", pyspark.sql.functions.to_date(payroll_df["AgencyStartDate"], "M/d/yyyy"))

payroll_df.show(300)

In [None]:
# Check for Null values
for column in payroll_df.columns:
    print(column, 'Nulls: ', payroll_df.filter(payroll_df[column].isNull()).count())

In [None]:
# Fill null values with defaults
for col_name, dtype in payroll_df.dtypes:
    if dtype == "string":
        payroll_df = payroll_df.fillna({col_name: "Unknown"})
    elif dtype in ["double", "float"]:
        payroll_df = payroll_df.fillna({col_name: 0.0})
    elif dtype in ["int", "bigint"]:
        payroll_df = payroll_df.fillna({col_name: 0})

In [None]:
# Merge all data together
merged_data = payroll_df \
    .join(employee_df, ["EmployeeID", "LastName", "FirstName"], "left") \
    .join(agency_df, ["AgencyID", "AgencyName"], "left") \
    .join(jobtitle_df, ["TitleCode", "TitleDescription"], "left")


In [None]:

# Create Employee Dimension Table
employee_dim = merged_data.select('EmployeeID', 'LastName', 'FirstName', 'WorkLocationBorough', 'LeaveStatusasofJune30').dropDuplicates(['EmployeeID'])

#Create Agency Dimension Table
agency_dim = merged_data.select('AgencyID', 'AgencyName').dropDuplicates()

#Create Job_Title Dimension Table
jobtitle_dim = merged_data.select('TitleCode', 'TitleDescription').dropDuplicates() 

#Create Time Dimension Table
time_dim = merged_data.select('FiscalYear').dropDuplicates() \
        .withColumn('TimeID', monotonically_increasing_id()) \
        .select('TimeID', 'FiscalYear')

# # Create Payroll_Fact_table
payroll_fact_tbl = merged_data.join(employee_dim.alias('e'), ['LastName', 'FirstName', 'LeaveStatusasofJune30', 'WorkLocationBorough'], 'inner') \
                .join(agency_dim.alias('a'), ['AgencyName'], 'inner') \
                .join(jobtitle_dim.alias('t'), ['TitleDescription'], 'inner') \
                .join(time_dim, ['FiscalYear'], 'inner') \
                .withColumn('PayrollID', monotonically_increasing_id()) \
                .select('PayrollID','e.EmployeeID', 'a.AgencyID', 't.TitleCode', 'TimeID', 'PayrollNumber', 'BaseSalary', 'PayBasis', 'AgencyStartDate', 'RegularHours', 'RegularGrossPaid', 'OTHours', 'TotalOTPaid', 'TotalOtherPay')

In [None]:
# Save tables to Cleaned_data folder using parquet
employee_dim.write.mode("overwrite").parquet(r"dataset\cleaned_data\employee_dim")
agency_dim.write.mode("overwrite").parquet(r"dataset\cleaned_data\agency_dim")
jobtitle_dim.write.mode("overwrite").parquet(r"dataset\cleaned_data\jobtitle_dim")
time_dim.write.mode("overwrite").parquet(r"dataset\cleaned_data\time_dim")
payroll_fact_tbl.write.mode("overwrite").parquet(r"dataset\cleaned_data\payroll_fact_table")

In [None]:
# Function to rename all columns to lowercase
def rename_columns_to_lowercase(df):
    return df.toDF(*[col.lower() for col in df.columns])

# Apply to all dimension tables
employee_dim = rename_columns_to_lowercase(employee_dim)
agency_dim = rename_columns_to_lowercase(agency_dim)
jobtitle_dim = rename_columns_to_lowercase(jobtitle_dim)
time_dim = rename_columns_to_lowercase(time_dim)
payroll_fact_tbl = rename_columns_to_lowercase(payroll_fact_tbl)

In [None]:
payroll_fact_tbl.show(205)

#### Data Loading

In [None]:
# Develop a function to get the Database connection
load_dotenv()

DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PORT = os.getenv("DB_PORT")
DB_PASS = os.getenv("DB_PASSWORD")
DB_PASSWORD = os.getenv("DB_PASS")
DB_HOST = os.getenv("DB_HOST")

def get_db_connection():
    connection = psycopg2.connect(
        host=DB_HOST,
        database=DB_NAME,
        user=DB_USER,
        password=DB_PASS,
        port=DB_PORT,
        options="-c search_path=nyc_payroll"
    )
    return connection

#connect to our database
conn = get_db_connection()

In [None]:
# Create a function create tables
def create_tables():
    conn = get_db_connection()
    cursor = conn.cursor()
    create_table_query = '''

                        CREATE SCHEMA IF NOT EXISTS nyc_payroll;

                        DROP TABLE IF EXISTS nyc_payroll.employee CASCADE;
                        DROP TABLE IF EXISTS nyc_payroll.agency CASCADE;
                        DROP TABLE IF EXISTS nyc_payroll.jobtitle CASCADE;
                        DROP TABLE IF EXISTS nyc_payroll.time CASCADE;
                        DROP TABLE IF EXISTS nyc_payroll.fact_table CASCADE;


                        CREATE TABLE IF NOT EXISTS nyc_payroll.employee(
                            EmployeeID INT PRIMARY KEY,
                            LastName VARCHAR(1000),
                            FirstName VARCHAR(1000),
                            WorkLocationBorough VARCHAR(1000),
                            LeaveStatusasofJune30 VARCHAR(1000)
                        );

                        CREATE TABLE IF NOT EXISTS nyc_payroll.agency(
                            AgencyID INT PRIMARY KEY,
                            AgencyName VARCHAR(1000)
                        );

                        CREATE TABLE IF NOT EXISTS nyc_payroll.jobtitle(
                            TitleCode INT PRIMARY KEY,
                            TitleDescription VARCHAR(1000)
                        );

                        CREATE TABLE IF NOT EXISTS nyc_payroll.time(
                            TimeID INT PRIMARY KEY,
                            FiscalYear INT
                        );

                        CREATE TABLE IF NOT EXISTS nyc_payroll.fact_table(
                            PayrollID INT PRIMARY KEY,
                            EmployeeID INT,
                            AgencyID INT,
                            TitleCode INT,
                            TimeID INT,
                            PayrollNumber INT,
                            BaseSalary DECIMAL(10,2),
                            PayBasis VARCHAR(1000),
                            AgencyStartDate DATE,
                            RegularHours DECIMAL(10,2),
                            RegularGrossPaid DECIMAL(10,2),
                            OTHours DECIMAL(10,2),
                            TotalOTPaid DECIMAL(10,2),
                            TotalOtherPay DECIMAL(10,2),
                            FOREIGN KEY (EmployeeID) REFERENCES nyc_payroll.employee(EmployeeID),
                            FOREIGN KEY (AgencyID) REFERENCES nyc_payroll.agency(AgencyID),
                            FOREIGN KEY (TitleCode) REFERENCES nyc_payroll.jobtitle(TitleCode),
                            FOREIGN KEY (TimeID) REFERENCES nyc_payroll.time(TimeID)

                        );

        '''
    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()

create_tables()

# Create SQLAlchemy engine
engine = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")

# # Convert PySpark DF to Pandas
try:
    employee_dim.toPandas().to_sql("employee", engine, schema="nyc_payroll", if_exists="append", index=False)
    agency_dim.toPandas().to_sql("agency", engine, schema="nyc_payroll", if_exists="append", index=False)
    jobtitle_dim.toPandas().to_sql("jobtitle", engine, schema="nyc_payroll", if_exists="append", index=False)
    time_dim.toPandas().to_sql("time", engine, schema="nyc_payroll", if_exists="append", index=False)
    payroll_fact_tbl.toPandas().to_sql("fact_table", engine, schema="nyc_payroll", if_exists="append", index=False)
    print("Data loaded successfully!")
except Exception as e:
    print("Data loading Failed!", e)
conn.close()
