# Preprocessing


In [12]:
import pandas as pd
import numpy as np
import json
import os
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

print("Starting preprocessing with proper target creation...")


Starting preprocessing with proper target creation...


In [13]:
def detect_environment():
    try:
        import google.colab
        from google.colab import drive
        drive.mount('/content/drive/')
        return 'colab', '/content/drive/MyDrive/fcst'
    except ImportError:
        return 'local', '..'

environment, base_path = detect_environment()
print(f"Environment: {environment}")
print(f"Base path: {base_path}")


Environment: local
Base path: ..


In [14]:
print("\nChecking data formats...")

def convert_to_parquet_if_needed(csv_path, parquet_path):
    """Convert CSV to parquet if parquet doesn't exist"""
    if os.path.exists(parquet_path):
        print(f"  ✓ Parquet already exists: {os.path.basename(parquet_path)}")
        return False
    
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"CSV file not found: {csv_path}")
    
    print(f"  Converting {os.path.basename(csv_path)} to parquet...")
    start = datetime.now()
    
    # Read CSV with optimized dtypes
    if 'transactions' in csv_path:
        # Read in chunks for large transaction file
        chunk_size = 1_000_000
        chunks = []
        for i, chunk in enumerate(pd.read_csv(csv_path, chunksize=chunk_size)):
            chunk['amount'] = chunk['amount'].str.replace('$', '').astype('float32')
            chunk['client_id'] = chunk['client_id'].astype('int32')
            chunk['mcc'] = chunk['mcc'].astype('category')
            chunks.append(chunk)
            print(f"    Processed {(i+1) * chunk_size:,} rows...", end='\r')
        df = pd.concat(chunks, ignore_index=True)
    else:
        # Regular read for smaller files
        df = pd.read_csv(csv_path)
        # Optimize dtypes for users data
        for col in ['per_capita_income', 'yearly_income', 'total_debt']:
            if col in df.columns:
                df[col] = df[col].astype(str).str.replace('$', '').str.replace(',', '').astype('float32')
    
    # Save as parquet
    df.to_parquet(parquet_path, compression='snappy')
    
    # Report compression
    csv_size = os.path.getsize(csv_path) / (1024**2)
    parquet_size = os.path.getsize(parquet_path) / (1024**2)
    
    print(f"\n  ✓ Converted: {csv_size:.1f} MB → {parquet_size:.1f} MB ({csv_size/parquet_size:.1f}x compression)")
    print(f"  Time: {(datetime.now() - start).total_seconds():.1f} seconds")
    return True

# Convert data files
transactions_csv = f'{base_path}/data/transactions_data.csv'
transactions_parquet = f'{base_path}/data/transactions_data.parquet'
users_csv = f'{base_path}/data/users_data.csv'
users_parquet = f'{base_path}/data/users_data.parquet'

convert_to_parquet_if_needed(transactions_csv, transactions_parquet)
convert_to_parquet_if_needed(users_csv, users_parquet)



Checking data formats...
  ✓ Parquet already exists: transactions_data.parquet
  ✓ Parquet already exists: users_data.parquet


False

In [15]:
print("\nLoading data from parquet format...")
start = datetime.now()

df = pd.read_parquet(transactions_parquet)
users_df = pd.read_parquet(users_parquet)

with open(f'{base_path}/data/mcc_mapping.json', 'r') as f:
    mcc_mapping = json.load(f)

print(f"Data loaded in {(datetime.now() - start).total_seconds():.1f} seconds")
print(f"Transactions: {len(df):,}")
print(f"Users: {len(users_df):,}")



Loading data from parquet format...
Data loaded in 2.1 seconds
Transactions: 13,305,915
Users: 2,000


In [16]:
print("\nBasic preprocessing...")
df['date'] = pd.to_datetime(df['date'])
df['amount'] = df['amount'].abs()

# Data types should already be optimized from parquet
# Just ensure mcc is string for mapping
df['mcc'] = df['mcc'].astype(str)

# Clean user data
for col in ['per_capita_income', 'yearly_income', 'total_debt']:
    if col in users_df.columns:
        users_df[col] = users_df[col].astype(str).str.replace('$', '').str.replace(',', '').astype(float)

# Map MCC to categories
mcc_to_category = {}
for category, info in mcc_mapping['categories'].items():
    for mcc_code in info['mcc_codes']:
        mcc_to_category[str(mcc_code)] = category

df['category'] = df['mcc'].map(mcc_to_category).fillna('other')

# Get top 5 categories
top_5_categories = df[df['category'] != 'other']['category'].value_counts().head(5).index.tolist()
df = df[df['category'].isin(top_5_categories)]

print(f"Top 5 categories: {top_5_categories}")



Basic preprocessing...
Top 5 categories: ['food', 'transport', 'retail', 'services', 'specialty']


In [17]:
print("\nCreating weekly aggregates...")
df['week'] = df['date'].dt.to_period('W')
weekly_data = df.groupby(['client_id', 'category', 'week'])['amount'].sum().reset_index()

# Filter series
series_lengths = weekly_data.groupby(['client_id', 'category']).size()
valid_series = series_lengths[series_lengths >= 104]

def calculate_nonzero_ratio(group):
    return (group['amount'] > 0).mean()

nonzero_ratios = weekly_data.set_index(['client_id', 'category']).loc[valid_series.index].groupby(['client_id', 'category']).apply(calculate_nonzero_ratio)
final_series = nonzero_ratios[nonzero_ratios >= 0.30]

weekly_data = weekly_data.set_index(['client_id', 'category']).loc[final_series.index].reset_index()
weekly_data['date'] = weekly_data['week'].dt.start_time
weekly_data = weekly_data.sort_values(['client_id', 'category', 'date'])

print(f"Final series: {len(final_series)}")



Creating weekly aggregates...
Final series: 6026


In [18]:
print("\nMerging with user data...")
weekly_data = weekly_data.merge(users_df.rename(columns={'id': 'client_id'}), on='client_id', how='left')

# Add basic transformations
weekly_data['log_amount'] = np.log1p(weekly_data['amount'])
weekly_data['sqrt_amount'] = np.sqrt(weekly_data['amount'])



Merging with user data...


In [19]:
print("\nCreating target (next week's amount)...")

def create_target(group):
    """Create target as NEXT week's amount"""
    group = group.sort_values('date')
    # Target is the NEXT week's amount
    group['target'] = group['amount'].shift(-1)
    # Also create log and sqrt targets for different model types
    group['target_log'] = group['log_amount'].shift(-1)
    group['target_sqrt'] = group['sqrt_amount'].shift(-1)
    return group

weekly_data = weekly_data.groupby(['client_id', 'category']).apply(create_target).reset_index(drop=True)

# Remove last observation per series (no target available)
weekly_data = weekly_data.dropna(subset=['target'])

print(f"Data with targets: {len(weekly_data):,} records")
print("Target columns: target (next week's amount), target_log, target_sqrt")



Creating target (next week's amount)...
Data with targets: 2,298,956 records
Target columns: target (next week's amount), target_log, target_sqrt


In [20]:
print("\nCreating train/test split...")
train_data = []
test_data = []

for (client_id, category), group in weekly_data.groupby(['client_id', 'category']):
    n = len(group)
    train_size = int(n * 0.8)
    
    train_data.append(group.iloc[:train_size])
    test_data.append(group.iloc[train_size:])

train_df = pd.concat(train_data, ignore_index=True)
test_df = pd.concat(test_data, ignore_index=True)

print(f"Train set: {len(train_df):,} records")
print(f"Test set: {len(test_df):,} records")



Creating train/test split...
Train set: 1,836,701 records
Test set: 462,255 records


In [21]:
print("\nVerifying temporal integrity...")
for (client_id, category), group in train_df.groupby(['client_id', 'category']):
    group = group.sort_values('date')
    # Check that target date is always after feature date
    for i in range(len(group)-1):
        current_date = group.iloc[i]['date']
        next_date = group.iloc[i+1]['date']
        target_amount = group.iloc[i]['target']
        next_amount = group.iloc[i+1]['amount']
        assert abs(target_amount - next_amount) < 0.01, f"Target mismatch for {client_id}-{category}"
    break  # Just check one series

print("✓ Target correctly represents next week's amount")



Verifying temporal integrity...
✓ Target correctly represents next week's amount


In [22]:
print("\nSaving preprocessed data...")
output_dir = f'{base_path}/data/preprocessed'
os.makedirs(output_dir, exist_ok=True)

# Save as CSV
train_df.to_csv(f'{output_dir}/train_with_target.csv', index=False)
test_df.to_csv(f'{output_dir}/test_with_target.csv', index=False)

# Also save as parquet for faster loading
train_df.to_parquet(f'{output_dir}/train_with_target.parquet', index=False)
test_df.to_parquet(f'{output_dir}/test_with_target.parquet', index=False)

print("\n=== PREPROCESSING SUMMARY ===")
print(f"Environment: {environment}")
print(f"Data format: Parquet (5-10x faster than CSV)")
print(f"Train data: {len(train_df):,} records")
print(f"Test data: {len(test_df):,} records")
print("\nTarget setup:")
print("- target: next week's amount (what we predict)")
print("- All features use data up to current week only")
print("- No data leakage: features at time t, target at time t+1")
print("\nFiles saved (both CSV and Parquet):")
print(f"- {output_dir}/train_with_target.csv (.parquet)")
print(f"- {output_dir}/test_with_target.csv (.parquet)") 


Saving preprocessed data...

=== PREPROCESSING SUMMARY ===
Environment: local
Data format: Parquet (5-10x faster than CSV)
Train data: 1,836,701 records
Test data: 462,255 records

Target setup:
- target: next week's amount (what we predict)
- All features use data up to current week only
- No data leakage: features at time t, target at time t+1

Files saved (both CSV and Parquet):
- ../data/preprocessed/train_with_target.csv (.parquet)
- ../data/preprocessed/test_with_target.csv (.parquet)
