# Pipeline for Data Preprocessing

## 1. Import Required Libraries

In [50]:
import pandas as pd
import os
import shutil
import warnings
import csv
import re


warnings.filterwarnings('ignore')

raw_dir = '../../data/raw'
cleaned_dir = '../../data/cleaned'

# Create cleaned directory if not exists
os.makedirs(cleaned_dir, exist_ok=True)

print(f"Raw: {raw_dir}")
print(f"Cleaned: {cleaned_dir}")

Raw: ../../data/raw
Cleaned: ../../data/cleaned


## Scan Directory and List All CSV Files

In [51]:
# Check if directory exists
if not os.path.exists(raw_dir):
    print(f"Error: Directory {raw_dir} does not exist")
    csv_files = []
else:
    # Get all CSV files
    csv_files = [f for f in os.listdir(raw_dir) if f.endswith('.csv')]

    print(f"Found {len(csv_files)} CSV files in {raw_dir}")
    print("\nCSV Files List ⬇️")
    for i, filename in enumerate(csv_files, 1):
        file_size = os.path.getsize(os.path.join(raw_dir, filename)) / (1024 * 1024)  # MB
        print(f"  {i}. {filename} ({file_size:.2f} MB)")

Found 2 CSV files in ../../data/raw

CSV Files List ⬇️
  1. TransactionData 10-9-25.csv (601.85 MB)
  2. TransactionData10-3-25.csv (442.34 MB)


## Preprocess CSV Files
This cell performs the following preprocessing tasks:
1. Standardize headers (e.g., "AccountID" → "Account ID")
2. Fix comma issues (remove extra commas in field values)
3. Clean Amount field (remove $ and commas, convert to numeric)
4. Fill missing values (Amount→0, others→"Unknown", "null"→empty)
5. Rename files based on date range (MM-DD-YYYY-MM-DD-YYYY.csv)

Output: Cleaned and renamed CSV files saved to ../../data/cleaned/


In [52]:
ENABLE_RENAMING = True

STANDARD_HEADERS = [
    'Account ID', 'Member ID', 'Account Type', 'Account Open Date',
    'Member Age', 'Product ID', 'Post Date', 'Post Time', 'Amount',
    'Action Type', 'Source Type', 'Transaction Description',
    'Fraud Adjustment Indicator'
]

for i, filename in enumerate(csv_files, 1):
    file_path = os.path.join(raw_dir, filename)
    print(f"[{i}/{len(csv_files)}] {filename}...", end=' ')

    # Step 1: Pre-process raw lines to clean Amount field
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    processed_lines = []
    fixed_amount = 0

    for idx, line in enumerate(lines):
        if idx == 0:  # Keep header as-is
            processed_lines.append(line)
            continue

        # Find and clean amount patterns like "$1,000.00" or "$500"
        # Pattern: optional quote + $ + digits with optional commas + optional .decimals + optional quote
        def clean_amount(match):
            amount_text = match.group(0)
            # Remove quotes, $, commas
            cleaned = amount_text.replace('"', '').replace('$', '').replace(',', '').strip()
            return cleaned

        # Match quoted or unquoted amounts
        new_line = re.sub(r'"\$[\d,]+\.?\d*\s*"|\$[\d,]+\.?\d*', clean_amount, line)

        if new_line != line:
            fixed_amount += 1

        processed_lines.append(new_line)

    # Step 2: Parse with CSV module
    from io import StringIO
    csv_text = ''.join(processed_lines)
    reader = csv.reader(StringIO(csv_text))
    all_rows = list(reader)

    if not all_rows:
        print("EMPTY")
        continue

    # Standardize headers
    header_fields = all_rows[0]
    normalized_header = []
    for field in header_fields:
        field_clean = field.strip().replace(' ', '')
        matched = False
        for std in STANDARD_HEADERS:
            if field_clean.lower() == std.replace(' ', '').lower():
                normalized_header.append(std)
                matched = True
                break
        if not matched:
            normalized_header.append(field.strip())

    # Get column indices
    try:
        idx_map = {h: normalized_header.index(h) for h in ['Amount', 'Product ID', 'Action Type',
                   'Transaction Description', 'Account Type', 'Post Date']}
    except ValueError:
        idx_map = {}

    cleaned_rows = [normalized_header]
    fixed_comma = fixed_missing = 0
    first_date = last_date = None

    # Process data rows
    for row in all_rows[1:]:
        fields = list(row)

        # Remove empty trailing field (from trailing comma)
        while len(fields) > 13 and not fields[-1].strip():
            fields.pop()

        # Merge extra fields from end
        while len(fields) > 13:
            fields[-3] = fields[-3] + ' ' + fields[-2]
            fields.pop(-2)
            fixed_comma += 1

        # Pad if needed
        while len(fields) < 13:
            fields.append('')

        # Extract dates
        if 'Post Date' in idx_map:
            try:
                date_val = pd.to_datetime(fields[idx_map['Post Date']])
                if first_date is None:
                    first_date = date_val
                last_date = date_val
            except:
                pass

        # Fill missing values
        fill_rules = [
            ('Amount', '0'),
            ('Product ID', 'Unknown'),
            ('Action Type', 'Unknown'),
            ('Transaction Description', 'Unknown'),
            ('Account Type', 'Unknown')
        ]

        for col, fill_val in fill_rules:
            if col in idx_map:
                idx = idx_map[col]
                if 0 <= idx < len(fields):
                    val = fields[idx].strip()
                    if not val or val.lower() == 'null':
                        fields[idx] = fill_val
                        fixed_missing += 1

        cleaned_rows.append(fields)

    # Determine output filename
    output_filename = filename
    if ENABLE_RENAMING and first_date and last_date:
        output_filename = f"{first_date.strftime('%m-%d-%Y')}-{last_date.strftime('%m-%d-%Y')}.csv"

    # Save
    output_path = os.path.join(cleaned_dir, output_filename)
    with open(output_path, 'w', encoding='utf-8', newline='') as f:
        writer = csv.writer(f)
        writer.writerows(cleaned_rows)

    # Output summary
    msg = []
    if fixed_comma > 0:
        msg.append(f"Fields:{fixed_comma}")
    if fixed_amount > 0:
        msg.append(f"Amount:{fixed_amount}")
    if fixed_missing > 0:
        msg.append(f"Missing:{fixed_missing}")
    if ENABLE_RENAMING and output_filename != filename:
        msg.append(f"→{output_filename}")
    print(', '.join(msg) if msg else "OK")

[1/2] TransactionData 10-9-25.csv... Fields:221, Amount:95, Missing:113579, →01-01-2025-12-31-2024.csv
[2/2] TransactionData10-3-25.csv... Amount:238, Missing:71518, →08-01-2025-09-01-2025.csv


## Analyze Each CSV File

In [None]:
results = []

for i, filename in enumerate(csv_files, 1):
    cleaned_path = os.path.join(cleaned_dir, filename)
    print(f"\n[{i}/{len(csv_files)}] {filename}")

    try:
        df = pd.read_csv(cleaned_path, low_memory=False)

        # Get first and last date (data is sorted by Post Date)
        first_date = pd.to_datetime(df['Post Date'].iloc[0])
        last_date = pd.to_datetime(df['Post Date'].iloc[-1])

        total = len(df)
        fraud = df['Fraud Adjustment Indicator'].notna().sum()
        pct = (fraud / total * 100) if total > 0 else 0

        new_name = f"{first_date.strftime('%m-%d-%Y')}-{last_date.strftime('%m-%d-%Y')}.csv"

        results.append({
            'original': filename,
            'new_name': new_name,
            'first_date': first_date,
            'last_date': last_date,
            'total': total,
            'fraud': fraud,
            'pct': round(pct, 4)
        })

        print(f"  Date: {first_date.strftime('%m/%d/%Y')} - {last_date.strftime('%m/%d/%Y')}")
        print(f"  Total: {total:,} | Fraud: {fraud:,} ({pct:.4f}%)")
        print(f"  → {new_name}")

    except Exception as e:
        print(f"  ERROR: {str(e)}")

## Rename Files

In [None]:
ENABLE_RENAMING = False  # Set to True to rename

if ENABLE_RENAMING:
    for result in results:
        old_path = os.path.join(raw_dir, result['original'])
        new_path = os.path.join(raw_dir, result['new_name'])

        if old_path != new_path and not os.path.exists(new_path):
            shutil.move(old_path, new_path)
            print(f"✓ {result['original']} → {result['new_name']}")
    print("Done!")
else:
    print("Set ENABLE_RENAMING = True to rename")