In [1]:
import pandas as pd

In [2]:
import boto3
from dotenv import load_dotenv
import logging

import os
from io import StringIO

from helpers.metrics import start_metrics_server, files_extracted, rows_extracted, rows_transformed, rows_validated, missing_values_detected, rows_staged, fact_table_created, rows_processed, rows_cleaned, data_quality_issues 
from helpers.logging_utils import setup_logging

setup_logging()
load_dotenv(override=True)

start_metrics_server(port=8001)

OSError: [Errno 48] Address already in use

# EXTRACTION STAGE

In [3]:
import os

# File definitions
master_files = ['EmpMaster.csv', 'TitleMaster.csv', 'AgencyMaster.csv']
payroll_files = ['nycpayroll_2020.csv', 'nycpayroll_2021.csv']
master_table_names = ['DimEmployee', 'DimTitle', 'DimAgency']
dim_columns = [
    ['EmployeeID', 'LastName', 'FirstName', 'LeaveStatusasofJune30'],
    ['TitleCode', 'TitleDescription'],
    ['AgencyID', 'AgencyName', 'AgencyStartDate']
]

# AWS and S3 configuration
s3_bucket = os.getenv("s3_bucket")
s3_prefix = os.getenv("s3_prefix")
aws_region = os.getenv("aws_region")
aws_access_key_id = os.getenv("aws_access_key_id")
aws_secret_access_key = os.getenv("aws_secret_access_key")

def initialize_s3_client(aws_region, aws_access_key_id, aws_secret_access_key):
    return boto3.client(
        's3',
        region_name=aws_region,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key
    )

s3_client = initialize_s3_client(aws_region, aws_access_key_id, aws_secret_access_key)


def extract_from_s3(s3_client, s3_bucket, s3_prefix, file_name):
    try:
        logging.info(f"Extracting {file_name} from S3")
        obj = s3_client.get_object(Bucket=s3_bucket, Key=s3_prefix + file_name)
        df = pd.read_csv(StringIO(obj['Body'].read().decode('utf-8')))

        # Update metrics
        files_extracted.inc()  # Increment the count of files extracted

        return df
    except Exception as e:
        logging.error(f"Failed to extract {file_name}: {str(e)}")
        raise
    
def extract_data(file_name):
    return extract_from_s3(s3_client, s3_bucket, s3_prefix, file_name)



In [4]:
df_test = extract_data('EmpMaster.csv')
df_test.head(3)

2024-08-20 12:15:32,938 - root - INFO - Extracting EmpMaster.csv from S3


Unnamed: 0,EmployeeID,LastName,FirstName
0,100001,AACHEN,DAVID
1,100002,AACHEN,MONICA
2,100003,AADAMS,LAMMELL


# TRANSFORM AND VALIDATE

In [5]:
from helpers.db_utils import read_table
from helpers.alert_utils import send_urgent_email
from helpers.metrics import rows_transformed, rows_validated, missing_values_detected

def validate_and_clean_data(df, dim_col):
    logging.info(f"Validating and cleaning data")

    total_rows = len(df)
    rows_validated.set(total_rows)  # Set the number of rows being validated

    for col in dim_col:
        if col not in df.columns:
            df[col] = None

    # Check for missing values
    missing = df.isnull().sum()
    missing_values_detected.set(missing.sum())  # Set the total number of missing values detected

    missing_percentage = (missing / total_rows) * 100

    # Log changes
    changes_log = []

    # Handling based on missing value percentages
    for col, pct in missing_percentage.items():
        if pct <= 5:
            df.dropna(subset=[col], inplace=True)
            changes_log.append(f"Dropped rows with missing values in {col} as it was <= 5%")
        elif 5 < pct <= 10:
            if df[col].dtype == 'object':  # Replace with 'UNKNOWN' for strings
                df[col].fillna('UNKNOWN', inplace=True)
                changes_log.append(f"Replaced missing string values in {col} with 'UNKNOWN'")
            else:
                mean_value = df[col].mean()
                df[col].fillna(mean_value, inplace=True)
                changes_log.append(f"Replaced missing numeric values in {col} with mean: {mean_value}")
        else:
            logging.error(f"Missing values in {col} exceed 10%. Manual intervention required.")
            send_urgent_email(
                subject=f"Data Quality Issue Detected in {col}",
                body=f"High percentage of missing values in {col}: {pct}%. Immediate attention required.",
                to_email="data.engineer@example.com"
            )
            raise ValueError(f"High percentage of missing values in {col}: {pct}%")

    # Anomaly detection
    for col in df.select_dtypes(include=['number']).columns:
        # Replace negative values
        if (df[col] < 0).any():
            df.loc[df[col] < 0, col] = df[col].mean()
            changes_log.append(f"Replaced negative values in {col} with mean.")

        # Replace values greater than 2 * standard deviation
        upper_bound = df[col].mean() + 2 * df[col].std()
        if (df[col] > upper_bound).any():
            df.loc[df[col] > upper_bound, col] = df[col].mean()
            changes_log.append(f"Replaced outliers in {col} (>{2} * SD) with mean.")

    # Log all changes made to a table or a file
    logging.info("Data Cleaning Summary: " + "; ".join(changes_log))

    df.drop_duplicates(inplace=True)
    rows_transformed.set(len(df))  # Set the number of rows transformed

    return df

def transform_master_data(df, required_columns):
    return validate_and_clean_data(df, required_columns)

def transform_transactional_data(df, engine):
    df = validate_and_clean_data(df, ['EmployeeID', 'AgencyID', 'TitleCode'])

    dim_employee = read_table(engine, 'DimEmployee')
    df = pd.merge(df, dim_employee[['EmployeeID']], on='EmployeeID', how='left')

    dim_agency = read_table(engine, 'DimAgency')
    df = pd.merge(df, dim_agency[['AgencyID']], on='AgencyID', how='left')

    dim_title = read_table(engine, 'DimTitle')
    df = pd.merge(df, dim_title[['TitleCode']], on='TitleCode', how='left')

    return df


# LOADING AND INGESTION STAGE

In [9]:
from helpers.db_utils import redshift_engine
redshift_engine()

2024-08-20 12:23:30,482 - root - INFO - Successfully created Redshift engine.


Engine(redshift+psycopg2://ridwanclouds:***@payroll-workgroup.637423632863.eu-west-2.redshift-serverless.amazonaws.com:5439/payrolldb)

In [13]:
from helpers.db_utils import redshift_engine, stage_data

engine = redshift_engine()
df = df_test
table_name = 'test'
stage_data(engine, df, table_name)


2024-08-20 13:03:50,413 - root - INFO - Successfully created Redshift engine.
2024-08-20 13:03:50,482 - root - ERROR - Failed to stage data to staging_test: (psycopg2.OperationalError) could not translate host name "payroll-workgroup.637423632863.eu-west-2.redshift-serverless.amazonaws.com" to address: nodename nor servname provided, or not known

(Background on this error at: https://sqlalche.me/e/14/e3q8)


In [16]:
from helpers.redshift_utils import redshift_engine2, stage_data

engine = redshift_engine2()
df = df_test
table_name = 'test'
stage_data(engine, df, table_name)


2024-08-20 13:13:00,494 - root - INFO - Successfully created Redshift engine.
2024-08-20 13:13:00,548 - root - ERROR - Failed to stage data to staging_test: (psycopg2.OperationalError) could not translate host name "payroll-workgroup.637423632863.eu-west-2.redshift-serverless.amazonaws.com" to address: nodename nor servname provided, or not known

(Background on this error at: https://sqlalche.me/e/14/e3q8)


In [8]:
from helpers.db_utils import redshift_engine, stage_data, create_fact_table
from helpers.metrics import rows_staged,fact_table_created
from sqlalchemy import MetaData, Table, Column, Integer, String
import logging

metadata = MetaData()


def load_master_data(df, table_name, engine):
    logging.info(f"Loading master data into {table_name}")
    stage_data(engine, df, table_name)
    rows_staged.set(len(df))

def load_transactional_data(df, engine):
    logging.info(f"Loading transactional data into FactPayroll")
    create_fact_table(engine, engine.metadata)
    fact_table_created.set(1)
    stage_data(engine, df, 'FactPayroll')
    rows_staged.set(len(df))
