# Feature Engineering


In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import MultiLabelBinarizer, StandardScaler, OneHotEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
import pickle
import time
import warnings
warnings.filterwarnings('ignore')

# MLflow imports
import mlflow
import mlflow.sklearn
from datetime import datetime

# Set environment variable to avoid tokenizer parallelism
import os
os.environ['TOKENIZERS_PARALLELISM'] = 'false'

# Set MLflow experiment
mlflow.set_experiment("movie-rating-feature-engineering")
print("+++ MLflow experiment set: movie-rating-feature-engineering +++")

# Load data
data_path = Path("../data/raw/movies.csv")
df = pd.read_csv(data_path)
df.columns = [col.lower() for col in df.columns]

df['vote_average'] = pd.to_numeric(df['vote_average'], errors='coerce')
df = df[~df.vote_average.isna()]
df = df[df['vote_average'] != 0]

print(f"\nDataset shape: {df.shape}")
print(f"vote_average dtype: {df['vote_average'].dtype}")

# Split data into X and y BEFORE pipeline transformation
X = df.drop('vote_average', axis=1)
y = df['vote_average']

# Train/test split
RANDOM_STATE = 42
TEST_SIZE = 0.2
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE)

print(f"\nTraining set size: {len(X_train)}")
print(f"Test set size: {len(X_test)}")
X_train.head()


2026/01/01 20:55:25 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2026/01/01 20:55:25 INFO mlflow.store.db.utils: Updating database tables
2026/01/01 20:55:25 INFO alembic.runtime.migration: Context impl SQLiteImpl.
2026/01/01 20:55:25 INFO alembic.runtime.migration: Will assume non-transactional DDL.
2026/01/01 20:55:25 INFO alembic.runtime.migration: Context impl SQLiteImpl.
2026/01/01 20:55:25 INFO alembic.runtime.migration: Will assume non-transactional DDL.


+++ MLflow experiment set: movie-rating-feature-engineering +++

Dataset shape: (9726, 9)
vote_average dtype: float64

Training set size: 7780
Test set size: 1946


Unnamed: 0,release_date,title,overview,popularity,vote_count,original_language,genre,poster_url
5144,1986-07-30,Flight of the Navigator,12-year-old David is accidentally knocked out ...,20.522,618,en,"Family, Science Fiction, Adventure",https://image.tmdb.org/t/p/original/dS4jmRcEAm...
5780,1993-01-01,Farewell My Concubine,"Abandoned by his prostitute mother in 1920, Do...",19.009,353,zh,Drama,https://image.tmdb.org/t/p/original/f54hNIiHNI...
1589,2016-05-13,Justice League vs. Teen Titans,Robin is sent by Batman to work with the Teen ...,48.531,684,en,"Science Fiction, Action, Animation",https://image.tmdb.org/t/p/original/3G6RPpafXA...
2146,2009-09-29,Superman/Batman: Public Enemies,United States President Lex Luthor uses the on...,38.864,476,en,"Science Fiction, Animation, Action, Adventure,...",https://image.tmdb.org/t/p/original/izvMc22ywS...
6482,1967-12-20,Asterix the Gaul,"In the year 50 BC, Gaul is occupied by the Rom...",17.614,536,fr,"Family, Animation, Adventure, Comedy",https://image.tmdb.org/t/p/original/jBDZ68iRPE...


In [2]:
# Save test and train data
X_train.to_csv('../data/processed/X_train.csv', index=False)
X_test.to_csv('../data/processed/X_test.csv', index=False)
y_train.to_csv('../data/processed/y_train.csv', index=False)
y_test.to_csv('../data/processed/y_test.csv', index=False)

## Custom Transformers


In [3]:
# Import custom transformers from src module
import sys
from pathlib import Path

# Add parent directory to path to import from src
sys.path.insert(0, str(Path.cwd().parent))

# Import all custom transformers
from src.transformers import (
    ColumnSelector,
    DataTypeFixer,
    YearBinning,
    GenreMultiLabelEncoder,
    LanguageGrouper,
    LightweightTextEmbedder,
    SelectiveStandardScaler,
    CategoricalOneHotEncoder,
    SentenceTransformerEmbedder,
)

print("Custom transformers imported from src.transformers")

Custom transformers imported from src.transformers


## Build the Feature Engineering Pipeline


In [4]:
# Define pipeline configurations
columns_to_keep = ['release_date', 'title', 'overview', 'original_language', 'genre']

# Hyperparameters
LANGUAGE_THRESHOLD = 0.01
OVERVIEW_MAX_FEATURES = 50
TITLE_MAX_FEATURES = 30

def build_pipeline_v1_original():
    """Version 1: Original - TF-IDF + OneHotEncoder"""
    return Pipeline([
        ('select_columns', ColumnSelector(columns=columns_to_keep)),
        ('fix_dtypes', DataTypeFixer()),
        ('bin_years', YearBinning()),
        ('encode_genres', GenreMultiLabelEncoder()),
        ('group_languages', LanguageGrouper(threshold=LANGUAGE_THRESHOLD)),
        ('embed_overview', LightweightTextEmbedder(column='overview', max_features=OVERVIEW_MAX_FEATURES, prefix='overview')),
        ('embed_title', LightweightTextEmbedder(column='title', max_features=TITLE_MAX_FEATURES, prefix='title')),
        ('onehot_encode', CategoricalOneHotEncoder(columns=['year_bin', 'original_language'])),
        ('scale_features', SelectiveStandardScaler()),
    ])

def build_pipeline_v2_sentence_transformers():
    """Version 2: Sentence Transformers - Lightweight embeddings for Lambda"""
    return Pipeline([
        ('select_columns', ColumnSelector(columns=columns_to_keep)),
        ('fix_dtypes', DataTypeFixer()),
        ('bin_years', YearBinning()),
        ('encode_genres', GenreMultiLabelEncoder()),
        ('group_languages', LanguageGrouper(threshold=LANGUAGE_THRESHOLD)),
        # Use sentence transformers instead of TF-IDF
        ('embed_overview', SentenceTransformerEmbedder(column='overview', model_name='all-MiniLM-L6-v2', prefix='overview')),
        ('embed_title', SentenceTransformerEmbedder(column='title', model_name='all-MiniLM-L6-v2', prefix='title')),
        ('onehot_encode', CategoricalOneHotEncoder(columns=['year_bin', 'original_language'])),
        ('scale_features', SelectiveStandardScaler()),
    ])

# Dictionary of all pipeline versions
PIPELINE_VERSIONS = {
    'v1_original': {
        'name': 'Original (TF-IDF + OneHot)',
        'builder': build_pipeline_v1_original,
        'description': 'Baseline: TF-IDF text features with OneHotEncoder for categoricals'
    },
    'v2_sentence_transformer': {
        'name': 'Sentence Transformers (Lambda-ready)',
        'builder': build_pipeline_v2_sentence_transformers,
        'description': 'Lightweight sentence embeddings (384 dim), suitable for Lambda deployment',
        'requires': 'sentence-transformers'
    }
}

In [5]:
# Discover available pipeline versions
models_dir = Path('../models')
pipeline_files = list(models_dir.glob('feature_pipeline_*.pkl'))

AVAILABLE_PIPELINES = {}
for pipeline_file in pipeline_files:
    version_key = pipeline_file.stem.replace('feature_pipeline_', '')
    AVAILABLE_PIPELINES[version_key] = {
        'path': pipeline_file,
        'name': version_key.replace('_', ' ').title()
    }


## Train All Pipeline Versions

In [6]:
def fit_and_log_pipeline(version_key, X_train, X_test, y_train, y_test):
    """Fit a pipeline version and log to MLflow"""
    
    config = PIPELINE_VERSIONS[version_key]
    print("\n" + "="*70)
    print(f"PIPELINE: {config['name']}")
    print("="*70)
    print(f"Description: {config['description']}")
    
    # Build pipeline
    pipeline = config['builder']()
    
    # Start MLflow run
    with mlflow.start_run(run_name=f"{version_key}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"):
        
        # Log parameters
        mlflow.log_param("pipeline_version", version_key)
        mlflow.log_param("version_name", config['name'])
        mlflow.log_param("random_state", RANDOM_STATE)
        mlflow.log_param("test_size", TEST_SIZE)
        mlflow.log_param("language_threshold", LANGUAGE_THRESHOLD)
        
        # Log text feature config
        has_tfidf = 'TF-IDF' in config['name']
        has_sentence_transformer = 'Sentence' in config['name']
        has_onehot = 'No OneHot' not in config['name']
        
        mlflow.log_param("text_embedding_type", 
                        'sentence_transformer' if has_sentence_transformer else 'tfidf')
        mlflow.log_param("uses_onehot_encoder", has_onehot)
        
        if has_tfidf:
            mlflow.log_param("overview_max_features", OVERVIEW_MAX_FEATURES)
            mlflow.log_param("title_max_features", TITLE_MAX_FEATURES)
        
        print("\nMLflow tracking enabled")
        print(f"Run ID: {mlflow.active_run().info.run_id}")
        
        # Fit pipeline
        print("\nFitting pipeline...")
        start_time = time.time()
        
        try:
            X_train_transformed = pipeline.fit_transform(X_train)
            fit_time = time.time() - start_time
            
            print(f"Pipeline fitted in {fit_time:.2f}s")
            print(f"Features: {X_train.shape[1]} → {X_train_transformed.shape[1]}")
            
            # Transform test data
            print("\nTransforming test data...")
            transform_start = time.time()
            X_test_transformed = pipeline.transform(X_test)
            transform_time = time.time() - transform_start
            
            print(f"Test data transformed in {transform_time:.2f}s")
            
            # Log metrics
            mlflow.log_param("input_features", X_train.shape[1])
            mlflow.log_param("output_features", X_train_transformed.shape[1])
            mlflow.log_metric("fit_time_seconds", fit_time)
            mlflow.log_metric("transform_time_seconds", transform_time)
            mlflow.log_metric("feature_expansion_ratio", 
                            X_train_transformed.shape[1] / X_train.shape[1])
            mlflow.log_metric("total_samples", len(X_train) + len(X_test))
            
            # Log target stats
            mlflow.log_metric("target_mean", y_train.mean())
            mlflow.log_metric("target_std", y_train.std())
            
            # Analyze feature types
            feature_names = list(X_train_transformed.columns)
            genre_count = len([f for f in feature_names if f.startswith('genre_')])
            overview_count = len([f for f in feature_names if 'overview' in f])
            title_count = len([f for f in feature_names if 'title' in f])
            year_count = len([f for f in feature_names if 'year_bin' in f])
            lang_count = len([f for f in feature_names if 'original_language' in f])
            
            mlflow.log_metric("genre_features", genre_count)
            mlflow.log_metric("overview_features", overview_count)
            mlflow.log_metric("title_features", title_count)
            mlflow.log_metric("year_features", year_count)
            mlflow.log_metric("language_features", lang_count)
            
            print(f"\nFeature breakdown:")
            print(f"  Genre: {genre_count}")
            print(f"  Overview: {overview_count}")
            print(f"  Title: {title_count}")
            print(f"  Year: {year_count}")
            print(f"  Language: {lang_count}")
            
            # Save pipeline
            pipeline_path = Path(f"../models/feature_pipeline_{version_key}.pkl")
            pipeline_path.parent.mkdir(exist_ok=True)
            
            with open(pipeline_path, 'wb') as f:
                pickle.dump(pipeline, f)
            
            # Log to MLflow
            mlflow.sklearn.log_model(pipeline, f"pipeline_{version_key}")
            
            print(f"\n✓ Pipeline saved: {pipeline_path}")
            print("✓ MLflow logging complete")
            
            return {
                'version': version_key,
                'name': config['name'],
                'pipeline': pipeline,
                'X_train': X_train_transformed,
                'X_test': X_test_transformed,
                'y_train': y_train,
                'y_test': y_test,
                'fit_time': fit_time,
                'output_features': X_train_transformed.shape[1]
            }
            
        except Exception as e:
            print(f"Error fitting pipeline: {e}")
            mlflow.log_param("status", "failed")
            mlflow.log_param("error", str(e))
            return None

In [7]:
# Fit all pipeline versions
print("\n" + "="*70)
print("FITTING ALL PIPELINE VERSIONS")
print("="*70)

results = {}

# Version 1: Original (always available)
print("\n[1/2] Version 1: Original Pipeline")
results['v1'] = fit_and_log_pipeline('v1_original', X_train, X_test, y_train, y_test)

# Version 2: Sentence Transformers (requires sentence-transformers)
print("\n[2/2] Version 2: Sentence Transformer Pipeline")
results['v2'] = fit_and_log_pipeline('v2_sentence_transformer', X_train, X_test, y_train, y_test)

print("\n" + "="*70)
print("SUMMARY")
print("="*70)

successful = {k: v for k, v in results.items() if v is not None}
print(f"Successfully fitted {len(successful)}/{len(results)} pipelines")

if successful:
    comparison_df = pd.DataFrame([{
        'Version': v['version'],
        'Name': v['name'],
        'Output Features': v['output_features'],
        'Fit Time (s)': f"{v['fit_time']:.2f}"
    } for v in successful.values()])
    
    print("Pipeline Comparison:")
    print(comparison_df.to_string(index=False))
else:
    print("No pipelines were successfully fitted")

# Store results for later use
pipeline_results = results


FITTING ALL PIPELINE VERSIONS

[1/2] Version 1: Original Pipeline

PIPELINE: Original (TF-IDF + OneHot)
Description: Baseline: TF-IDF text features with OneHotEncoder for categoricals

MLflow tracking enabled
Run ID: fb682cf813234c12b091db388fe49835

Fitting pipeline...




Pipeline fitted in 0.87s
Features: 8 → 119

Transforming test data...
Test data transformed in 0.07s

Feature breakdown:
  Genre: 19
  Overview: 50
  Title: 30
  Year: 11
  Language: 9





✓ Pipeline saved: ../models/feature_pipeline_v1_original.pkl
✓ MLflow logging complete

[2/2] Version 2: Sentence Transformer Pipeline

PIPELINE: Sentence Transformers (Lambda-ready)
Description: Lightweight sentence embeddings (384 dim), suitable for Lambda deployment

MLflow tracking enabled
Run ID: c55bf2fb508b4f6eac6e593fc5c8289f

Fitting pipeline...
Pipeline fitted in 23.52s
Features: 8 → 807

Transforming test data...
Test data transformed in 3.21s

Feature breakdown:
  Genre: 19
  Overview: 384
  Title: 384
  Year: 11
  Language: 9





✓ Pipeline saved: ../models/feature_pipeline_v2_sentence_transformer.pkl
✓ MLflow logging complete

SUMMARY
Successfully fitted 2/2 pipelines
Pipeline Comparison:
                Version                                 Name  Output Features Fit Time (s)
            v1_original           Original (TF-IDF + OneHot)              119         0.87
v2_sentence_transformer Sentence Transformers (Lambda-ready)              807        23.52


## Save Transformed Data

Save the transformed training and test data for model training.


In [8]:
# Save transformed data for each pipeline version
print("\n" + "="*70)
print("SAVING TRANSFORMED DATA")
print("="*70)

processed_data_dir = Path('../data/processed')
processed_data_dir.mkdir(parents=True, exist_ok=True)

for version_key, result in results.items():
    if result is not None:
        print(f"\n✓ Saving {result['name']} ({version_key})...")
        
        # Get transformed data
        X_train_transformed = result['X_train']
        X_test_transformed = result['X_test']
        y_train_data = result['y_train']
        y_test_data = result['y_test']
        
        # Save to CSV files with version suffix
        X_train_path = processed_data_dir / f'X_train_transformed_{version_key}.csv'
        X_test_path = processed_data_dir / f'X_test_transformed_{version_key}.csv'
        y_train_path = processed_data_dir / f'y_train_{version_key}.csv'
        y_test_path = processed_data_dir / f'y_test_{version_key}.csv'
        
        X_train_transformed.to_csv(X_train_path, index=False)
        X_test_transformed.to_csv(X_test_path, index=False)
        y_train_data.to_csv(y_train_path, index=False, header=['vote_average'])
        y_test_data.to_csv(y_test_path, index=False, header=['vote_average'])

print("\n" + "="*70)
print(f"SAVED {len([r for r in results.values() if r is not None])} pipeline versions")
print("="*70)



SAVING TRANSFORMED DATA

✓ Saving Original (TF-IDF + OneHot) (v1)...

✓ Saving Sentence Transformers (Lambda-ready) (v2)...

SAVED 2 pipeline versions
