# Fannie Mae Loan Performance Data: CSV to Parquet Conversion

## Overview
This notebook converts Fannie Mae Single-Family Loan Performance CSV files to optimized Parquet format.

**Key Features:**
- Uses proper column names and data types from the R reference script
- Handles pipe-separated values (|) format
- Optimizes memory usage with appropriate data types
- Provides significant file size reduction through compression

**Input:** Raw CSV files from Fannie Mae (located in `../../data/raw/`)
**Output:** Optimized Parquet files for efficient analysis (saved to `../../data/processed/`)

**Reference:** Based on `LPPUB_Infile.R` script from Fannie Mae (see `../scripts/`)

## 1. Import Required Libraries

In [1]:
import pandas as pd
import os
import subprocess
import sys
from pathlib import Path

# Install pyarrow if not already available
try:
    import pyarrow
    print("✓ pyarrow is available")
except ImportError:
    print("Installing pyarrow...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "pyarrow"])
    import pyarrow
    print("✓ pyarrow installed successfully")

✓ pyarrow is available


## 2. Configuration and File Paths

In [2]:
# Define paths
SOURCE_DATA_DIR = Path('../../data/raw')
PROCESSED_DATA_DIR = Path('../../data/processed')
INPUT_FILE = '2025Q1.csv'  # Change this to process different files

csv_path = SOURCE_DATA_DIR / INPUT_FILE
parquet_path = PROCESSED_DATA_DIR / INPUT_FILE.replace('.csv', '.parquet')

print(f"Input CSV: {csv_path}")
print(f"Output Parquet: {parquet_path}")
print(f"File exists: {csv_path.exists()}")

Input CSV: ../../data/raw/2025Q1.csv
Output Parquet: ../../data/processed/2025Q1.parquet
File exists: True


## 3. Column Definitions from Fannie Mae R Script

These column names and types are based on the official `LPPUB_Infile.R` script provided by Fannie Mae.

In [3]:
# Column names from LPPUB_Infile.R
LPPUB_COLUMN_NAMES = [
    "POOL_ID", "LOAN_ID", "ACT_PERIOD", "CHANNEL", "SELLER", "SERVICER",
    "MASTER_SERVICER", "ORIG_RATE", "CURR_RATE", "ORIG_UPB", "ISSUANCE_UPB",
    "CURRENT_UPB", "ORIG_TERM", "ORIG_DATE", "FIRST_PAY", "LOAN_AGE",
    "REM_MONTHS", "ADJ_REM_MONTHS", "MATR_DT", "OLTV", "OCLTV",
    "NUM_BO", "DTI", "CSCORE_B", "CSCORE_C", "FIRST_FLAG", "PURPOSE",
    "PROP", "NO_UNITS", "OCC_STAT", "STATE", "MSA", "ZIP", "MI_PCT",
    "PRODUCT", "PPMT_FLG", "IO", "FIRST_PAY_IO", "MNTHS_TO_AMTZ_IO",
    "DLQ_STATUS", "PMT_HISTORY", "MOD_FLAG", "MI_CANCEL_FLAG", "Zero_Bal_Code",
    "ZB_DTE", "LAST_UPB", "RPRCH_DTE", "CURR_SCHD_PRNCPL", "TOT_SCHD_PRNCPL",
    "UNSCHD_PRNCPL_CURR", "LAST_PAID_INSTALLMENT_DATE", "FORECLOSURE_DATE",
    "DISPOSITION_DATE", "FORECLOSURE_COSTS", "PROPERTY_PRESERVATION_AND_REPAIR_COSTS",
    "ASSET_RECOVERY_COSTS", "MISCELLANEOUS_HOLDING_EXPENSES_AND_CREDITS",
    "ASSOCIATED_TAXES_FOR_HOLDING_PROPERTY", "NET_SALES_PROCEEDS",
    "CREDIT_ENHANCEMENT_PROCEEDS", "REPURCHASES_MAKE_WHOLE_PROCEEDS",
    "OTHER_FORECLOSURE_PROCEEDS", "NON_INTEREST_BEARING_UPB", "PRINCIPAL_FORGIVENESS_AMOUNT",
    "ORIGINAL_LIST_START_DATE", "ORIGINAL_LIST_PRICE", "CURRENT_LIST_START_DATE",
    "CURRENT_LIST_PRICE", "ISSUE_SCOREB", "ISSUE_SCOREC", "CURR_SCOREB",
    "CURR_SCOREC", "MI_TYPE", "SERV_IND", "CURRENT_PERIOD_MODIFICATION_LOSS_AMOUNT",
    "CUMULATIVE_MODIFICATION_LOSS_AMOUNT", "CURRENT_PERIOD_CREDIT_EVENT_NET_GAIN_OR_LOSS",
    "CUMULATIVE_CREDIT_EVENT_NET_GAIN_OR_LOSS", "HOMEREADY_PROGRAM_INDICATOR",
    "FORECLOSURE_PRINCIPAL_WRITE_OFF_AMOUNT", "RELOCATION_MORTGAGE_INDICATOR",
    "ZERO_BALANCE_CODE_CHANGE_DATE", "LOAN_HOLDBACK_INDICATOR", "LOAN_HOLDBACK_EFFECTIVE_DATE",
    "DELINQUENT_ACCRUED_INTEREST", "PROPERTY_INSPECTION_WAIVER_INDICATOR",
    "HIGH_BALANCE_LOAN_INDICATOR", "ARM_5_YR_INDICATOR", "ARM_PRODUCT_TYPE",
    "MONTHS_UNTIL_FIRST_PAYMENT_RESET", "MONTHS_BETWEEN_SUBSEQUENT_PAYMENT_RESET",
    "INTEREST_RATE_CHANGE_DATE", "PAYMENT_CHANGE_DATE", "ARM_INDEX",
    "ARM_CAP_STRUCTURE", "INITIAL_INTEREST_RATE_CAP", "PERIODIC_INTEREST_RATE_CAP",
    "LIFETIME_INTEREST_RATE_CAP", "MARGIN", "BALLOON_INDICATOR",
    "PLAN_NUMBER", "FORBEARANCE_INDICATOR", "HIGH_LOAN_TO_VALUE_HLTV_REFINANCE_OPTION_INDICATOR",
    "DEAL_NAME", "RE_PROCS_FLAG", "ADR_TYPE", "ADR_COUNT", "ADR_UPB", 
    "PAYMENT_DEFERRAL_MOD_EVENT_FLAG", "INTEREST_BEARING_UPB"
]

print(f"Total columns: {len(LPPUB_COLUMN_NAMES)}")

Total columns: 110


## 4. Optimized Data Types

Define optimized data types for better memory efficiency and performance.

In [4]:
# Optimized data types based on R script column classes
OPTIMIZED_DTYPES = {
    # Character/categorical columns
    "POOL_ID": "string", "LOAN_ID": "string", "ACT_PERIOD": "string", 
    "CHANNEL": "category", "SELLER": "category", "SERVICER": "category",
    "MASTER_SERVICER": "category", "ORIG_DATE": "string", "FIRST_PAY": "string", 
    "MATR_DT": "string", "FIRST_FLAG": "category", "PURPOSE": "category",
    "PROP": "category", "OCC_STAT": "category", "STATE": "category", 
    "MSA": "string", "ZIP": "string", "PRODUCT": "category", 
    "PPMT_FLG": "category", "IO": "category", "FIRST_PAY_IO": "string", 
    "MNTHS_TO_AMTZ_IO": "string", "DLQ_STATUS": "category", "PMT_HISTORY": "string", 
    "MOD_FLAG": "category", "MI_CANCEL_FLAG": "category", "Zero_Bal_Code": "category",
    "ZB_DTE": "string", "RPRCH_DTE": "string", "LAST_PAID_INSTALLMENT_DATE": "string",
    "FORECLOSURE_DATE": "string", "DISPOSITION_DATE": "string", "ORIGINAL_LIST_START_DATE": "string",
    "CURRENT_LIST_START_DATE": "string", "MI_TYPE": "category", "SERV_IND": "category",
    "HOMEREADY_PROGRAM_INDICATOR": "category", "RELOCATION_MORTGAGE_INDICATOR": "category",
    "ZERO_BALANCE_CODE_CHANGE_DATE": "string", "LOAN_HOLDBACK_INDICATOR": "category",
    "LOAN_HOLDBACK_EFFECTIVE_DATE": "string", "PROPERTY_INSPECTION_WAIVER_INDICATOR": "category",
    "HIGH_BALANCE_LOAN_INDICATOR": "category", "ARM_5_YR_INDICATOR": "category",
    "ARM_PRODUCT_TYPE": "string", "INTEREST_RATE_CHANGE_DATE": "string",
    "PAYMENT_CHANGE_DATE": "string", "ARM_INDEX": "string", "ARM_CAP_STRUCTURE": "string",
    "BALLOON_INDICATOR": "category", "PLAN_NUMBER": "string", "FORBEARANCE_INDICATOR": "category",
    "HIGH_LOAN_TO_VALUE_HLTV_REFINANCE_OPTION_INDICATOR": "category", "DEAL_NAME": "string",
    "RE_PROCS_FLAG": "category", "ADR_TYPE": "string", "PAYMENT_DEFERRAL_MOD_EVENT_FLAG": "category",
    
    # Numeric columns with appropriate precision
    "ORIG_RATE": "float32", "CURR_RATE": "float32", "ORIG_UPB": "float64", "ISSUANCE_UPB": "float64",
    "CURRENT_UPB": "float64", "ORIG_TERM": "int16", "LOAN_AGE": "int16", "REM_MONTHS": "int16",
    "ADJ_REM_MONTHS": "int16", "OLTV": "float32", "OCLTV": "float32", "DTI": "float32",
    "CSCORE_B": "int16", "CSCORE_C": "int16", "MI_PCT": "float32", "NO_UNITS": "int8",
    "LAST_UPB": "float64", "CURR_SCHD_PRNCPL": "float64", "TOT_SCHD_PRNCPL": "float64",
    "UNSCHD_PRNCPL_CURR": "float64", "FORECLOSURE_COSTS": "float64", 
    "PROPERTY_PRESERVATION_AND_REPAIR_COSTS": "float64", "ASSET_RECOVERY_COSTS": "float64",
    "MISCELLANEOUS_HOLDING_EXPENSES_AND_CREDITS": "float64", "ASSOCIATED_TAXES_FOR_HOLDING_PROPERTY": "float64",
    "NET_SALES_PROCEEDS": "float64", "CREDIT_ENHANCEMENT_PROCEEDS": "float64",
    "REPURCHASES_MAKE_WHOLE_PROCEEDS": "float64", "OTHER_FORECLOSURE_PROCEEDS": "float64",
    "NON_INTEREST_BEARING_UPB": "float64", "PRINCIPAL_FORGIVENESS_AMOUNT": "float64",
    "ORIGINAL_LIST_PRICE": "float64", "CURRENT_LIST_PRICE": "float64",
    "ISSUE_SCOREB": "int16", "ISSUE_SCOREC": "int16", "CURR_SCOREB": "int16", "CURR_SCOREC": "int16",
    "CURRENT_PERIOD_MODIFICATION_LOSS_AMOUNT": "float64", "CUMULATIVE_MODIFICATION_LOSS_AMOUNT": "float64",
    "CURRENT_PERIOD_CREDIT_EVENT_NET_GAIN_OR_LOSS": "float64", "CUMULATIVE_CREDIT_EVENT_NET_GAIN_OR_LOSS": "float64",
    "FORECLOSURE_PRINCIPAL_WRITE_OFF_AMOUNT": "float64", "DELINQUENT_ACCRUED_INTEREST": "float64",
    "MONTHS_UNTIL_FIRST_PAYMENT_RESET": "int16", "MONTHS_BETWEEN_SUBSEQUENT_PAYMENT_RESET": "int16",
    "INITIAL_INTEREST_RATE_CAP": "float32", "PERIODIC_INTEREST_RATE_CAP": "float32",
    "LIFETIME_INTEREST_RATE_CAP": "float32", "MARGIN": "float32", "ADR_COUNT": "int16",
    "ADR_UPB": "float64", "INTEREST_BEARING_UPB": "float64"
}

print(f"Data type mappings defined for {len(OPTIMIZED_DTYPES)} columns")

Data type mappings defined for 109 columns


## 5. CSV to Parquet Conversion Function

In [5]:
def convert_csv_to_parquet(csv_file_path, parquet_file_path, column_names, dtype_mapping):
    """
    Convert Fannie Mae CSV to optimized Parquet format.
    
    Parameters:
    - csv_file_path: Path to input CSV file
    - parquet_file_path: Path to output Parquet file
    - column_names: List of column names
    - dtype_mapping: Dictionary mapping column names to data types
    
    Returns:
    - DataFrame with converted data
    """
    print(f"🔄 Reading CSV file: {csv_file_path}")
    
    # First pass: Read as strings to handle any data issues
    df = pd.read_csv(
        csv_file_path,
        sep='|',
        names=column_names,
        dtype='string',
        header=None,
        low_memory=False,
        na_values=['', ' ', 'NULL', 'null', 'NA']
    )
    
    print(f"📊 Initial shape: {df.shape}")
    print(f"🔧 Converting data types...")
    
    # Convert to optimized data types
    conversion_errors = []
    
    for col, target_dtype in dtype_mapping.items():
        if col in df.columns:
            try:
                if target_dtype == 'category':
                    df[col] = df[col].astype('category')
                elif target_dtype in ['int8', 'int16', 'int32', 'int64']:
                    # Use nullable integer types for columns with missing values
                    df[col] = pd.to_numeric(df[col], errors='coerce')
                    df[col] = df[col].astype(f'Int{target_dtype[3:]}')
                elif target_dtype in ['float32', 'float64']:
                    df[col] = pd.to_numeric(df[col], errors='coerce').astype(target_dtype)
                elif target_dtype == 'string':
                    df[col] = df[col].astype('string')
            except Exception as e:
                conversion_errors.append(f"{col}: {str(e)}")
    
    if conversion_errors:
        print(f"⚠️  Conversion warnings for {len(conversion_errors)} columns")
        for error in conversion_errors[:5]:  # Show first 5 errors
            print(f"   {error}")
    
    print(f"💾 Saving to Parquet: {parquet_file_path}")
    
    # Save to Parquet with compression
    df.to_parquet(
        parquet_file_path,
        engine='pyarrow',
        compression='snappy',
        index=False
    )
    
    return df

## 6. Run the Conversion

In [6]:
# Perform the conversion
df_converted = convert_csv_to_parquet(
    csv_path, 
    parquet_path, 
    LPPUB_COLUMN_NAMES, 
    OPTIMIZED_DTYPES
)

print(f"\n✅ Conversion completed successfully!")
print(f"📈 Final shape: {df_converted.shape}")

🔄 Reading CSV file: ../../data/raw/2025Q1.csv
📊 Initial shape: (388622, 110)
🔧 Converting data types...
💾 Saving to Parquet: ../../data/processed/2025Q1.parquet

✅ Conversion completed successfully!
📈 Final shape: (388622, 110)


## 7. Verification and Performance Analysis

In [7]:
# File size comparison
csv_size = os.path.getsize(csv_path)
parquet_size = os.path.getsize(parquet_path)

print("📁 File Size Comparison:")
print(f"   CSV:     {csv_size:,} bytes ({csv_size/1024/1024:.2f} MB)")
print(f"   Parquet: {parquet_size:,} bytes ({parquet_size/1024/1024:.2f} MB)")
print(f"   Compression ratio: {csv_size/parquet_size:.2f}x")
print(f"   Space saved: {((csv_size - parquet_size) / csv_size) * 100:.1f}%")

# Verify by reading back
print("\n🔍 Verification - Reading Parquet file:")
df_verify = pd.read_parquet(parquet_path, engine='pyarrow')
print(f"   Shape: {df_verify.shape}")
print(f"   Memory usage: ~{df_verify.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")

print("\n📋 Sample Data Types:")
for i, (col, dtype) in enumerate(df_verify.dtypes.head(10).items()):
    print(f"   {col}: {dtype}")

📁 File Size Comparison:
   CSV:     123,044,558 bytes (117.34 MB)
   Parquet: 8,896,605 bytes (8.48 MB)
   Compression ratio: 13.83x
   Space saved: 92.8%

🔍 Verification - Reading Parquet file:
   Shape: (388622, 110)
   Memory usage: ~734.56 MB

📋 Sample Data Types:
   POOL_ID: string
   LOAN_ID: string
   ACT_PERIOD: string
   CHANNEL: category
   SELLER: category
   SERVICER: category
   MASTER_SERVICER: category
   ORIG_RATE: float32
   CURR_RATE: float32
   ORIG_UPB: float64


## 8. Data Quality Summary

In [8]:
print("📊 Data Quality Summary:")
print(f"   Total rows: {len(df_verify):,}")
print(f"   Total columns: {len(df_verify.columns)}")

# Missing values summary
missing_summary = df_verify.isnull().sum()
columns_with_missing = missing_summary[missing_summary > 0]

print(f"   Columns with missing values: {len(columns_with_missing)}")
if len(columns_with_missing) > 0:
    print(f"   Top 5 columns with most missing values:")
    for col, count in columns_with_missing.head().items():
        pct = (count / len(df_verify)) * 100
        print(f"     {col}: {count:,} ({pct:.1f}%)")

# Data type distribution
dtype_counts = df_verify.dtypes.value_counts()
print(f"\n📈 Data Type Distribution:")
for dtype, count in dtype_counts.items():
    print(f"   {dtype}: {count} columns")

📊 Data Quality Summary:
   Total rows: 388,622
   Total columns: 110
   Columns with missing values: 77
   Top 5 columns with most missing values:
     POOL_ID: 388,622 (100.0%)
     SERVICER: 651 (0.2%)
     MASTER_SERVICER: 388,622 (100.0%)
     CURR_RATE: 651 (0.2%)
     ISSUANCE_UPB: 388,622 (100.0%)

📈 Data Type Distribution:
   string: 29 columns
   float64: 28 columns
   Int16: 13 columns
   float32: 10 columns
   category: 5 columns
   category: 5 columns
   category: 4 columns
   category: 1 columns
   category: 1 columns
   category: 1 columns
   category: 1 columns
   category: 1 columns
   category: 1 columns
   category: 1 columns
   Int8: 1 columns
   category: 1 columns
   category: 1 columns
   category: 1 columns
   category: 1 columns
   category: 1 columns
   category: 1 columns
   category: 1 columns
   category: 1 columns


## Summary

This notebook successfully converts Fannie Mae Loan Performance CSV files to optimized Parquet format with:

- **Proper column naming** based on official R script
- **Optimized data types** for memory efficiency
- **Significant compression** (typically 10-15x size reduction)
- **Data integrity** preservation
- **Error handling** for data quality issues

The resulting Parquet files can be used for efficient data analysis with much faster read times and reduced storage requirements.

**Next Steps:**
- Use the Parquet files for analysis in other notebooks
- Consider partitioning large datasets by year/quarter
- Implement data validation checks for production use