In [1]:
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
import logging
import os
import warnings
warnings.filterwarnings('ignore')

In [2]:
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [10]:
# ...existing code...
NUMERICAL_COLS = ['Amount', 'Value', 'CountryCode', 'PricingStrategy']
CATEGORICAL_COLS = ['CurrencyCode', 'ProductCategory', 'ChannelId', 'ProviderId']  # Removed 'ProductId'
REMAINDER_COLS = ['TransactionId', 'BatchId', 'AccountId', 'SubscriptionId', 'CustomerId']
IMPUTE_CATEGORICAL_COLS = ['CurrencyCode', 'ProductCategory', 'ChannelId', 'ProviderId']  # Exclude ProductId
# ...existing code...

In [11]:
class AggregateFeatures:
    """Custom transformer to compute aggregate features per CustomerId."""
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        logging.info("Computing aggregate features...")
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)
        if 'CustomerId' not in X.columns or 'Amount' not in X.columns:
            raise ValueError("Required columns 'CustomerId' and 'Amount' not found.")
        agg_df = X.groupby('CustomerId').agg({
            'Amount': ['sum', 'mean', 'count', 'std'],
        }).reset_index()
        agg_df.columns = ['CustomerId', 'TotalAmount', 'AvgAmount', 'TransactionCount', 'StdAmount']
        agg_df['StdAmount'] = agg_df['StdAmount'].fillna(0)
        X = X.merge(agg_df, on='CustomerId', how='left')
        logging.info(f"Columns after AggregateFeatures: {list(X.columns)}")
        return X

In [13]:
class DatetimeFeatures:
    """Custom transformer to extract datetime features from TransactionStartTime."""
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        logging.info("Extracting datetime features...")
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)
        if 'TransactionStartTime' not in X.columns:
            raise ValueError("Required column 'TransactionStartTime' not found.")
        X = X.copy()
        X['TransactionStartTime'] = pd.to_datetime(X['TransactionStartTime'], errors='coerce')
        X['TransactionHour'] = X['TransactionStartTime'].dt.hour
        X['TransactionDay'] = X['TransactionStartTime'].dt.day
        X['TransactionMonth'] = X['TransactionStartTime'].dt.month
        X['TransactionYear'] = X['TransactionStartTime'].dt.year
        logging.info(f"Columns after DatetimeFeatures: {list(X.columns)}")
        return X

In [14]:
def create_feature_engineering_pipeline():
    """Creates a feature engineering pipeline for transaction data."""
    logging.info("Validating input columns...")
    expected_cols = NUMERICAL_COLS + CATEGORICAL_COLS + REMAINDER_COLS + ['TransactionStartTime']
    
    numerical_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])
    
    categorical_pipeline_impute = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value='Unknown')),
        ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])
    
    categorical_pipeline_no_impute = Pipeline([
        ('onehot', OneHotEncoder(handle_unknown='error', sparse_output=False))
    ])
    
    preprocessor = ColumnTransformer([
        ('num', numerical_pipeline, NUMERICAL_COLS),
        ('cat_impute', categorical_pipeline_impute, IMPUTE_CATEGORICAL_COLS),
        ('cat_no_impute', categorical_pipeline_no_impute, ['ProductId'])
    ], remainder='passthrough')
    
    pipeline = Pipeline([
        ('aggregate', AggregateFeatures()),
        ('datetime', DatetimeFeatures()),
        ('preprocessor', preprocessor)
    ])
    
    return pipeline

In [27]:
def process_data(input_path, output_path, target_col='FraudResult'):
    """Processes raw data and saves transformed data."""
    # Validate input file
    logging.info(f"Checking for input file at {input_path}...")
    if not os.path.isfile(input_path):
        logging.warning(f"Input file {input_path} not found.")
        # Fallback to user input if file not found
        input_path = input("Please enter the correct path to data.csv: ").strip()
        if not os.path.isfile(input_path):
            logging.error(f"Provided file {input_path} still not found.")
            raise FileNotFoundError(f"File {input_path} does not exist.")
    
    logging.info(f"Loading data from {input_path}...")
    try:
        df = pd.read_csv(input_path)
    except Exception as e:
        logging.error(f"Failed to load {input_path}: {str(e)}")
        raise
    
    # Log dataset info for debugging
    logging.info(f"Dataset info:\n{df.info()}")
    logging.info(f"ProviderId unique values: {df['ProviderId'].nunique()}")
    logging.info(f"ProductId unique values: {df['ProductId'].nunique()}")
    logging.info(f"FraudResult distribution:\n{df['FraudResult'].value_counts(normalize=True)}")
    logging.info(f"First 5 rows:\n{df.head().to_string()}")
    for col in CATEGORICAL_COLS:
        logging.info(f"{col} unique values: {df[col].nunique()}")
        logging.info(f"{col} values: {df[col].unique().tolist()}")
    
    # Validate required columns
    required_cols = ['CustomerId', 'Amount', 'TransactionStartTime', 'FraudResult'] + \
                   ['Value', 'CountryCode', 'PricingStrategy', 'CurrencyCode', 
                    'ProductCategory', 'ChannelId', 'ProviderId', 'ProductId']
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        logging.error(f"Missing columns: {missing_cols}")
        raise ValueError(f"Missing columns: {missing_cols}")
    
    # Clean ProductId
    df['ProductId'] = df['ProductId'].str.strip().str.lower()
    logging.info(f"Cleaned ProductId unique values: {df['ProductId'].nunique()}")
    
    # Check for unexpected ProductId values and group them
    unexpected = set(df['ProductId'].unique()) - set(EXPECTED_PRODUCT_IDS)
    if unexpected:
        logging.warning(f"Found unexpected ProductId values. Grouping into 'other'.")
        df['ProductId'] = df['ProductId'].where(df['ProductId'].isin(EXPECTED_PRODUCT_IDS), 'other')
    
    # Get ProductId categories for the encoder
    product_id_categories = sorted(df['ProductId'].unique().tolist())
    logging.info(f"ProductId categories for OneHotEncoder: {product_id_categories}")
    
    # Impute missing values for specified categorical columns
    df[IMPUTE_CATEGORICAL_COLS] = df[IMPUTE_CATEGORICAL_COLS].fillna('Unknown')
    
    # Separate features and target
    X = df.drop(columns=[target_col])
    y = df[target_col]
    
    # Create and fit the feature engineering pipeline
    logging.info("Fitting and transforming data...")
    pipeline = create_feature_engineering_pipeline(product_id_categories)
    try:
        # The custom transformers (AggregateFeatures, DatetimeFeatures) are called first
        # before the preprocessor, so X is modified in place within the pipeline.
        X_with_features = pipeline.named_steps['datetime'].transform(
            pipeline.named_steps['aggregate'].transform(X)
        )
        # Now apply the preprocessor
        X_transformed = pipeline.named_steps['preprocessor'].fit_transform(X_with_features)
    except Exception as e:
        logging.error(f"Pipeline transformation failed: {str(e)}")
        raise
    
    # Get feature names directly from the fitted preprocessor
    # This is the corrected and robust way to get feature names
    feature_names = pipeline.named_steps['preprocessor'].get_feature_names_out()
    
    # Log feature names and shapes for debugging
    logging.info(f"Total feature names count: {len(feature_names)}")
    logging.info(f"Feature names: {feature_names.tolist()}")
    logging.info(f"Transformed data shape: {X_transformed.shape}")
    
    # Validate shape
    if X_transformed.shape[1] != len(feature_names):
        logging.error(f"Shape mismatch detected. Expected {len(feature_names)} columns, got {X_transformed.shape[1]}.")
        raise ValueError(f"Shape mismatch: Transformed data has {X_transformed.shape[1]} columns, but feature_names has {len(feature_names)}")
    
    # Convert transformed data to DataFrame
    X_transformed_df = pd.DataFrame(X_transformed, columns=feature_names)
    X_transformed_df[target_col] = y.reset_index(drop=True)
    
    # Save transformed data
    logging.info(f"Saving transformed data to {output_path}...")
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    X_transformed_df.to_csv(output_path, index=False)
    logging.info(f"Transformed data saved to {output_path}")
    
    return X_transformed_df

In [28]:
if __name__ == "__main__":
    default_input_path = '../data/raw/data.csv'
    default_output_path = '../data/processed/transformed_transactions.csv'
    try:
        transformed_df = process_data(default_input_path, default_output_path)
    except FileNotFoundError:
        logging.info("Please ensure the data.csv file is in the correct directory or provide the correct path.")

2025-07-02 15:24:29,524 - INFO - Checking for input file at ../data/raw/data.csv...
2025-07-02 15:24:29,536 - INFO - Loading data from ../data/raw/data.csv...


2025-07-02 15:24:30,162 - INFO - Dataset info:
None
2025-07-02 15:24:30,170 - INFO - ProviderId unique values: 6
2025-07-02 15:24:30,177 - INFO - ProductId unique values: 23
2025-07-02 15:24:30,188 - INFO - FraudResult distribution:
FraudResult
0    0.997982
1    0.002018
Name: proportion, dtype: float64
2025-07-02 15:24:30,200 - INFO - First 5 rows:
         TransactionId         BatchId       AccountId       SubscriptionId       CustomerId CurrencyCode  CountryCode    ProviderId     ProductId     ProductCategory    ChannelId   Amount  Value  TransactionStartTime  PricingStrategy  FraudResult
0  TransactionId_76871   BatchId_36123  AccountId_3957   SubscriptionId_887  CustomerId_4406          UGX          256  ProviderId_6  ProductId_10             airtime  ChannelId_3   1000.0   1000  2018-11-15T02:18:49Z                2            0
1  TransactionId_73770   BatchId_15642  AccountId_4841  SubscriptionId_3829  CustomerId_4406          UGX          256  ProviderId_4   ProductId_6  fin

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 95662 entries, 0 to 95661
Data columns (total 16 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   TransactionId         95662 non-null  object 
 1   BatchId               95662 non-null  object 
 2   AccountId             95662 non-null  object 
 3   SubscriptionId        95662 non-null  object 
 4   CustomerId            95662 non-null  object 
 5   CurrencyCode          95662 non-null  object 
 6   CountryCode           95662 non-null  int64  
 7   ProviderId            95662 non-null  object 
 8   ProductId             95662 non-null  object 
 9   ProductCategory       95662 non-null  object 
 10  ChannelId             95662 non-null  object 
 11  Amount                95662 non-null  float64
 12  Value                 95662 non-null  int64  
 13  TransactionStartTime  95662 non-null  object 
 14  PricingStrategy       95662 non-null  int64  
 15  FraudResult        

2025-07-02 15:24:30,348 - INFO - ChannelId values: ['ChannelId_3', 'ChannelId_2', 'ChannelId_1', 'ChannelId_5']
2025-07-02 15:24:30,359 - INFO - ProviderId unique values: 6
2025-07-02 15:24:30,369 - INFO - ProviderId values: ['ProviderId_6', 'ProviderId_4', 'ProviderId_1', 'ProviderId_5', 'ProviderId_3', 'ProviderId_2']
2025-07-02 15:24:30,493 - INFO - Cleaned ProductId unique values: 23


NameError: name 'EXPECTED_PRODUCT_IDS' is not defined