In [1]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, Any


In [2]:
# dict for data validation rules
VALIDATION_RULES = {
    
    'type': ['CASH-IN', 'CASH-OUT', 'DEBIT', 'PAYMENT', 'TRANSFER'],
    'isFraud': [0, 1],
    'isFlaggedFraud': [0,1]
}


In [3]:
def load_data(file_path):
    try:
        df = pd.read_csv(file_path)
        print(f"Loaded dataset with {len(df)} rows")
        return df
    except Exception as e:
        print(f"Error loading dataset: {e}")
        raise

# function to drop whitespaces
def strip_whitespace(df):
    if 'type' in df.columns:
        df['type'] = df['type'].str.strip()
    return df

# validation function
def validate_data(df):
    df = strip_whitespace(df)
    
    # Handle missing values
    if df.isnull().values.any():
        print(f"Missing values detected:\n{df.isnull().sum()}")
        df = df.dropna()
        print(f"Dropped rows with missing values. Remaining rows: {len(df)}")
    
    # Drop duplicates
    duplicates = df.duplicated(subset=['step', 'amount', 'type'], keep=False)
    if duplicates.sum() > 0:
        print(f"Found {duplicates.sum()} duplicate rows")
        df = df[~duplicates]
        print(f"Removed duplicates. Remaining rows: {len(df)}")
    

    # Check for negative transaction amounts and drop them
    negative_amounts = df[df['amount'] < 0]
    if not negative_amounts.empty:
        print(f"Negative transaction amounts found")
        df = df[df['amount'] > 0 ]
        print(f"Droppeed rows with negative amount. Remaining rows: {len(df)}")


    # Validate `isFlaggedFraud` consistency
    is_flagged_valid = (df['amount'] > 200000) == df['isFlaggedFraud']
    # Check if all rows satisfy the condition
    if is_flagged_valid.all():
        print("All flagged fraud rows are valid. Dropping 'isFlaggedFraud' column.")
    else:
        inconsistent_rows = df[~is_flagged_valid]
        df = df[is_flagged_valid]
        print("Inconsistent rows found in 'isFlaggedFraud':")
        print(f"Inconsistent rows found in 'isFlaggedFraud':")
        print(len(inconsistent_rows))
        print(f"Droppeed rows with inconsistent isFlaggedFraud. Remaining rows: {len(df)}")

    # Validate if the balance is column is correct
    # Balance column checks
    is_balance_valid = ((df['newbalanceOrig'] == df['oldbalanceOrg'] - df['amount']) | 
                          (df['oldbalanceOrg'] == 0) & (df['newbalanceOrig'] == 0))
    
    
    # Validate balance consistency to make sure the 
    is_balance_valid = ((df['newbalanceOrig'] == df['oldbalanceOrg'] - df['amount']) | 
                        ((df['oldbalanceOrg'] == 0) & (df['newbalanceOrig'] == 0)))

    # Check if all rows satisfy the balance condition
    if is_balance_valid.all():
        print("All balance rows are valid.")
    else:
        # Log or inspect rows with inconsistencies
        inconsistent_rows = df[~is_balance_valid]
        print("Inconsistent rows found in balance check:")
        print(inconsistent_rows)
        
        df = df[is_balance_valid]
        # Drop inconsistent rows
        #df = df[is_balance_valid].reset_index(drop=True)
        print(f"Dropped {len(inconsistent_rows)} inconsistent rows. Remaining rows: {len(df)}")


    # Ensure correct Data type
    try:

        df['step'] = df['step'].astype(int)
        df['isFraud'] = df['isFraud'].astype(int)
        df['isFlaggedFraud'] = df['isFlaggedFraud'].astype(int)
        
        df['amount'] = df['amount'].astype(float)
        
        df['oldbalanceOrg'] = df['oldbalanceOrg'].astype(float)
        df['newbalanceOrig'] = df['newbalanceOrig'].astype(float)
        df['oldbalanceDest'] = df['oldbalanceDest'].astype(float)
        df['newbalanceDest'] = df['newbalanceDest'].astype(float)

        df['timestamp'] = pd.to_datetime(df['timestamp'])

    except Exception as e:
        print(f"Data type conversion error: {e}")

    # Validate columns
    valid_types = df['type'].isin(VALIDATION_RULES['type'])
    df = df[valid_types]
    
    valid_fraud = df['isFraud'].isin(VALIDATION_RULES['isFraud'])
    df = df[valid_fraud]

    valid_is_flagged_fraud = df['isFlaggedFraud'].isin(VALIDATION_RULES['isFlaggedFraud'])
    df = df[valid_is_flagged_fraud]

    
    return df

# function to standardize the timestamp
def timestamp_standardization(df):
    base_time = datetime(2023, 1, 1)
    df['timestamp'] = df['step'].apply(lambda x: base_time + timedelta(hours=x))
    df['transaction_hour'] = df['timestamp'].dt.hour
    df['transaction_day'] = df['timestamp'].dt.day
    return df

def currency_conversion(df, conversion_rate = 1.5):
    
    df['amount'] = df['amount'] * conversion_rate
    return df


def aggregate_data(df, interval = 'daily'):
    aggregation_methods = {
        'amount': ['sum', 'mean', 'median'],
        'isFraud': 'sum'
    }
    
    if interval == 'hourly':
        hourly_agg = df.groupby(pd.Grouper(key='timestamp', freq='H')).agg(aggregation_methods)
        return hourly_agg 
    elif interval == 'daily':
        daily_agg = df.groupby(pd.Grouper(key='timestamp', freq='D')).agg(aggregation_methods)
        return daily_agg 
    elif interval == 'weekly':
        weekly_agg = df.groupby(pd.Grouper(key='timestamp', freq='W')).agg(aggregation_methods)
        return weekly_agg

def generate_data_profile(df):
    profile = {
        'total_transactions': len(df),
        'transaction_types_distribution': df['type'].value_counts().to_dict(),
        'fraud_distribution': df['isFraud'].value_counts().to_dict(),
        'amount_statistics': {
            'total_amount': df['amount'].sum(),
            'mean_amount': df['amount'].mean(),
            'median_amount': df['amount'].median(),
            'min_amount': df['amount'].min(),
            'max_amount': df['amount'].max()
        },
    }
    return profile

def save_to_csv(df, output_file):
    try:
        df.to_csv(output_file, index=False)
        print(f"Cleaned data saved to {output_file}")
    except Exception as e:
        print(f"Error saving data to CSV: {e}")


In [4]:

def run_etl_pipeline(input_file, output_file):
    print("Starting ETL Pipeline")
    
    # Load data
    df = load_data(input_file)
    
    # Validate data
    df = validate_data(df)
    print("Data Validation Completed")
    
    # Transform data
    df = timestamp_standardization(df)
    print("Data Transformation Completed")
    
    # Currency conversion
    df = currency_conversion(df, 1)
    print("Data Transformation Completed")

    # Aggregate data
    daily_data = aggregate_data(df, 'daily')
    hourly_data = aggregate_data(df, 'hourly')
    weekly_data = aggregate_data(df, 'weekly')
    print("Data Aggregation Completed")
    
    # Generate data profile
    profile = generate_data_profile(df)
    print("Data Profiling Completed")
    

    # Create summary statistics
    summary_stats = df.describe()
    print("Data Profiling Completed")

    # Save to CSV
    save_to_csv(df, output_file)
    
    return {
        'processed_data': df,
        'daily_aggregation': daily_data,
        'hourly_aggregation': hourly_data,
        'weekly_aggregation': weekly_data,
        'profile': profile,
        'summary_stats': summary_stats

    }


In [5]:
input_file = 'PS_20174392719_1491204439457_log.csv'
output_file = 'cleaned_transactions_final.csv'

results = run_etl_pipeline(input_file, output_file)

Starting ETL Pipeline
Loaded dataset with 6362620 rows
Found 12114 duplicate rows
Removed duplicates. Remaining rows: 6350506
Inconsistent rows found in 'isFlaggedFraud':
Inconsistent rows found in 'isFlaggedFraud':
1670317
Droppeed rows with inconsistent isFlaggedFraud. Remaining rows: 4680189
Inconsistent rows found in balance check:
         step      type      amount     nameOrig  oldbalanceOrg  \
8           1   PAYMENT     4024.36  C1265012928        2671.00   
9           1     DEBIT     5337.77   C712410124       41720.00   
10          1     DEBIT     9644.94  C1900366749        4465.00   
13          1   PAYMENT    11633.76  C1716932897       10127.00   
16          1   PAYMENT     1563.82   C761750706         450.00   
...       ...       ...         ...          ...            ...   
6362320   718  CASH_OUT   159188.22   C691808084        3859.00   
6362321   718  CASH_OUT   186273.84   C102120699      168046.00   
6362322   718  TRANSFER    82096.45   C614459560       1349

  hourly_agg = df.groupby(pd.Grouper(key='timestamp', freq='H')).agg(aggregation_methods)


Data Aggregation Completed
Data Profiling Completed
Data Profiling Completed
Cleaned data saved to cleaned_transactions_final.csv


# Data Aggregation on different intervals

In [6]:
results['weekly_aggregation'].head()

Unnamed: 0_level_0,amount,amount,amount,isFraud
Unnamed: 0_level_1,sum,mean,median,sum
timestamp,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
2023-01-01,2040851000.0,14180.356495,8225.34,66
2023-01-08,6853921000.0,14946.129707,8274.18,312
2023-01-15,11411050000.0,15834.589062,8741.87,311
2023-01-22,4016662000.0,16780.699468,9649.975,282
2023-01-29,979291200.0,15318.419436,7819.79,299


In [7]:
results['hourly_aggregation'].head()

Unnamed: 0_level_0,amount,amount,amount,isFraud
Unnamed: 0_level_1,sum,mean,median,sum
timestamp,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
2023-01-01 01:00:00,11047279.97,9997.538434,4444.35,5
2023-01-01 02:00:00,4249557.31,9258.294793,4674.03,2
2023-01-01 03:00:00,2399806.0,10125.763713,6028.79,2
2023-01-01 04:00:00,1637962.93,6423.384039,2403.55,3
2023-01-01 05:00:00,2242491.31,6942.697554,3509.48,3


In [8]:
results['daily_aggregation'].head()

Unnamed: 0_level_0,amount,amount,amount,isFraud
Unnamed: 0_level_1,sum,mean,median,sum
timestamp,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
2023-01-01,2040851000.0,14180.356495,8225.34,66
2023-01-02,1729373000.0,15109.103031,7910.5,55
2023-01-03,33758260.0,10949.807862,5587.64,48
2023-01-04,98675340.0,12216.830884,6531.66,45
2023-01-05,60981590.0,10451.000809,5041.14,43


# Summary statistics 

In [9]:
results['summary_stats'].head()

Unnamed: 0,step,amount,oldbalanceOrg,newbalanceOrig,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud,timestamp,transaction_hour,transaction_day
count,1628963.0,1628963.0,1628963.0,1628963.0,1628963.0,1628963.0,1628963.0,1628963.0,1628963,1628963.0,1628963.0
mean,244.2755,15555.61,64675.36,59648.53,110426.4,116134.5,0.0008324314,0.0,2023-01-11 04:16:31.653464320,15.41436,10.53588
min,1.0,0.02,0.0,0.0,0.0,0.0,0.0,0.0,2023-01-01 01:00:00,0.0,1.0
25%,156.0,3821.035,0.0,0.0,0.0,0.0,0.0,0.0,2023-01-07 12:00:00,12.0,7.0
50%,239.0,8630.6,0.0,0.0,0.0,0.0,0.0,0.0,2023-01-10 23:00:00,16.0,10.0


## Data Profile

In [10]:
results['profile']

{'total_transactions': 1628963,
 'transaction_types_distribution': {'PAYMENT': 1517432,
  'TRANSFER': 79836,
  'DEBIT': 31695},
 'fraud_distribution': {0: 1627607, 1: 1356},
 'amount_statistics': {'total_amount': np.float64(25339517428.88001),
  'mean_amount': np.float64(15555.612637536893),
  'median_amount': np.float64(8630.6),
  'min_amount': np.float64(0.02),
  'max_amount': np.float64(199995.99)}}

In [11]:
results['processed_data'].head()

Unnamed: 0,step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud,timestamp,transaction_hour,transaction_day
0,1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0,2023-01-01 01:00:00,1,1
1,1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0,2023-01-01 01:00:00,1,1
2,1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0,2023-01-01 01:00:00,1,1
4,1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0,2023-01-01 01:00:00,1,1
5,1,PAYMENT,7817.71,C90045638,53860.0,46042.29,M573487274,0.0,0.0,0,0,2023-01-01 01:00:00,1,1
