# H&M Feature Engineering - Memory Optimized Version

This notebook performs feature engineering on the H&M dataset using aggressive memory optimization techniques including streaming processing, smaller batch sizes, and progressive saving.

## 1. Import Libraries and Setup

In [None]:
import polars as pl
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import warnings
import gc
warnings.filterwarnings('ignore')

# Configure Polars for extreme memory efficiency
try:
    pl.Config.set_streaming_chunk_size(1000)  # Very small chunks
    pl.Config.set_fmt_str_lengths(30)
except AttributeError:
    pass

# Memory monitoring function
def check_memory_usage():
    try:
        import psutil
        process = psutil.Process(os.getpid())
        memory_mb = process.memory_info().rss / 1024 / 1024
        print(f"Memory usage: {memory_mb:.1f} MB")
        return memory_mb
    except ImportError:
        print("psutil not available for memory monitoring")
        return 0

# Configuration - adjust these based on your system
SAMPLE_SIZE = 500000  # Process only 500k records for testing
BATCH_SIZE = 10000    # Very small batch size
USE_SAMPLE = True     # Set to False to process full dataset

print(f"Libraries imported successfully")
print(f"Polars version: {pl.__version__}")
print(f"Configuration: Sample={USE_SAMPLE}, Sample size={SAMPLE_SIZE:,}, Batch size={BATCH_SIZE:,}")
check_memory_usage()

## 2. Load Dataset with Extreme Memory Optimization

In [None]:
# Load dataset with streaming
data_dir = '../data'
integrated_path = os.path.join(data_dir, 'processed', 'hm_integrated_dataset.parquet')

if not os.path.exists(integrated_path):
    raise FileNotFoundError(f"File not found: {integrated_path}")

print("Loading dataset with streaming...")

# Use lazy loading
df_lazy = pl.scan_parquet(integrated_path)

if USE_SAMPLE:
    print(f"Using sample of {SAMPLE_SIZE:,} records for testing...")
    df_integrated = df_lazy.head(SAMPLE_SIZE).collect()
else:
    print("Loading full dataset...")
    df_integrated = df_lazy.collect()

print(f"✓ Loaded {df_integrated.height:,} records with {len(df_integrated.columns)} columns")
print(f"✓ Memory usage: {df_integrated.estimated_size('mb'):.1f} MB")

# Get basic info
unique_customers = df_integrated['customer_id'].n_unique()
print(f"• Unique customers: {unique_customers:,}")
print(f"• Date range: {df_integrated['t_dat'].min()} to {df_integrated['t_dat'].max()}")

# Set reference date
REFERENCE_DATE = pd.to_datetime(df_integrated['t_dat'].max())
print(f"• Reference date: {REFERENCE_DATE.date()}")

check_memory_usage()

## 3. RFM Features with Ultra-Small Batches

In [None]:
print("Creating RFM features with ultra-small batches...")
gc.collect()
initial_memory = check_memory_usage()

# Get list of unique customers
customer_list = df_integrated.select('customer_id').unique().to_pandas()['customer_id'].tolist()
total_customers = len(customer_list)

print(f"Processing {total_customers:,} customers in batches of {BATCH_SIZE:,}")

# Prepare minimal data for RFM
rfm_data = df_integrated.select([
    'customer_id', 'price', 't_dat', 'article_id'
]).with_columns([
    pl.col('t_dat').str.to_date().alias('transaction_date')
])

# Process in very small batches
rfm_results = []
batch_count = 0

for i in range(0, total_customers, BATCH_SIZE):
    batch_customers = customer_list[i:i+BATCH_SIZE]
    batch_count += 1
    
    if batch_count % 10 == 0:
        print(f"Processing batch {batch_count}: customers {i:,} to {min(i+BATCH_SIZE, total_customers):,}")
        check_memory_usage()
    
    # Filter to batch customers
    batch_data = rfm_data.filter(pl.col('customer_id').is_in(batch_customers))
    
    if batch_data.height == 0:
        continue
    
    # Calculate RFM for this batch
    batch_rfm = (
        batch_data
        .group_by('customer_id')
        .agg([
            # Basic RFM metrics only
            ((pl.lit(REFERENCE_DATE) - pl.col('transaction_date').max()).dt.total_days()).abs().alias('recency_days'),
            pl.col('transaction_date').count().alias('frequency'),
            pl.col('price').sum().alias('monetary_value'),
            pl.col('price').mean().alias('avg_transaction_value'),
            pl.col('article_id').n_unique().alias('unique_products')
        ])
    )
    
    rfm_results.append(batch_rfm)
    
    # Clear batch data immediately
    del batch_data
    
    # Aggressive garbage collection every 20 batches
    if batch_count % 20 == 0:
        gc.collect()

# Combine results
print("Combining RFM batches...")
customer_rfm = pl.concat(rfm_results)
del rfm_results, rfm_data
gc.collect()

print(f"✓ Created RFM features for {customer_rfm.height:,} customers")
print(f"Sample RFM:")
print(customer_rfm.head(3).to_pandas())
check_memory_usage()

## 4. Basic Product Preferences (Simplified)

In [None]:
print("Creating simplified product preferences...")
gc.collect()

# Only keep essential product preference features
product_data = df_integrated.select([
    'customer_id', 'product_type_name', 'department_name', 'price', 'sales_channel_id'
])

# Most purchased category per customer
print("• Processing category preferences...")
category_prefs = (
    product_data
    .group_by(['customer_id', 'product_type_name'])
    .agg(pl.col('price').count().alias('count'))
    .sort(['customer_id', 'count'], descending=[False, True])
    .group_by('customer_id')
    .agg([
        pl.col('product_type_name').first().alias('top_category'),
        pl.col('product_type_name').n_unique().alias('category_diversity')
    ])
)

gc.collect()
print(f"  Memory: {check_memory_usage():.1f} MB")

# Price behaviour (simplified)
print("• Processing price behaviour...")
price_behaviour = (
    product_data
    .group_by('customer_id')
    .agg([
        pl.col('price').min().alias('min_price'),
        pl.col('price').max().alias('max_price'),
        pl.col('price').median().alias('median_price')
    ])
)

# Channel preference (simplified)
print("• Processing channel preferences...")
channel_prefs = (
    product_data
    .group_by(['customer_id', 'sales_channel_id'])
    .agg(pl.col('price').count().alias('count'))
    .sort(['customer_id', 'count'], descending=[False, True])
    .group_by('customer_id')
    .agg(pl.col('sales_channel_id').first().alias('preferred_channel'))
)

del product_data
gc.collect()

print(f"✓ Created simplified product preferences")
print(f"  • Categories: {category_prefs.height:,} customers")
print(f"  • Price behaviour: {price_behaviour.height:,} customers")
print(f"  • Channels: {channel_prefs.height:,} customers")
check_memory_usage()

## 5. Demographics (Essential Only)

In [None]:
print("Creating essential demographic features...")
gc.collect()

# Get unique customer demographics only
demographics = (
    df_integrated
    .select(['customer_id', 'age', 'club_member_status', 'fashion_news_frequency'])
    .unique(subset=['customer_id'])
    .with_columns([
        # Simplified age groups
        pl.when(pl.col('age') < 30).then(pl.lit('young'))
        .when(pl.col('age') < 50).then(pl.lit('middle'))
        .otherwise(pl.lit('senior'))
        .alias('age_group')
    ])
)

print(f"✓ Created demographics for {demographics.height:,} customers")
check_memory_usage()

## 6. Combine Features and Save Progressively

In [None]:
print("Combining features progressively...")
gc.collect()

# Start with RFM as base
final_features = customer_rfm

# Join other features one by one with immediate cleanup
print("• Joining category preferences...")
final_features = final_features.join(category_prefs, on='customer_id', how='left')
del category_prefs
gc.collect()

print("• Joining price behaviour...")
final_features = final_features.join(price_behaviour, on='customer_id', how='left')
del price_behaviour
gc.collect()

print("• Joining channel preferences...")
final_features = final_features.join(channel_prefs, on='customer_id', how='left')
del channel_prefs
gc.collect()

print("• Joining demographics...")
final_features = final_features.join(demographics, on='customer_id', how='left')
del demographics
gc.collect()

# Add simple interaction features
final_features = final_features.with_columns([
    (pl.col('frequency') * pl.col('avg_transaction_value')).alias('customer_value'),
    (pl.col('unique_products').cast(pl.Float64) / pl.col('frequency').cast(pl.Float64)).alias('product_diversity_ratio')
])

print(f"\n✓ Final feature dataset created!")
print(f"  • Total customers: {final_features.height:,}")
print(f"  • Total features: {len(final_features.columns)}")
print(f"  • Memory usage: {final_features.estimated_size('mb'):.1f} MB")

print(f"\nFeature columns: {final_features.columns}")
print(f"\nSample features:")
print(final_features.head(3).to_pandas())

check_memory_usage()

## 7. Save Results

In [None]:
print("Saving engineered features...")

# Create output directory
output_dir = os.path.join(data_dir, 'processed')
os.makedirs(output_dir, exist_ok=True)

# Save with appropriate suffix
suffix = '_sample' if USE_SAMPLE else '_full'
features_path = os.path.join(output_dir, f'hm_engineered_features{suffix}.parquet')

final_features.write_parquet(features_path)
print(f"✓ Saved features to: {features_path}")

# Save metadata
metadata = {
    'creation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    'total_customers': final_features.height,
    'total_features': len(final_features.columns),
    'is_sample': USE_SAMPLE,
    'sample_size': SAMPLE_SIZE if USE_SAMPLE else 'full'
}

import json
metadata_path = os.path.join(output_dir, f'feature_metadata{suffix}.json')
with open(metadata_path, 'w') as f:
    json.dump(metadata, f, indent=2)

print(f"✓ Saved metadata to: {metadata_path}")
print(f"\n🎉 Feature engineering completed successfully!")
print(f"Summary: {final_features.height:,} customers, {len(final_features.columns)} features")

check_memory_usage()