# PySpark-Based Feature Engineering Development

## Data Sources:
- `../data/cleaned_food_data_filtered.csv` - Original cleaned food data
- `../data/engineered_features_filtered.csv` -  engineered data
- `../data/feature_metadata_filtered.json` - Feature metadata

In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.decomposition import PCA
import json
import sys
import os
import time
from datetime import datetime

# PySpark imports for scalable feature engineering
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import (
    VectorAssembler, StandardScaler as SparkScaler,
    HashingTF, IDF, Tokenizer, StopWordsRemover,
    StringIndexer, OneHotEncoder, Bucketizer,
    QuantileDiscretizer, MinMaxScaler, Normalizer
)
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import (
    udf, col, explode, split, when, isnan, isnull,
    regexp_replace, lower, trim, size, array_contains
)

# Add src to path for imports
sys.path.append('../src')

print("Libraries imported successfully!")
print(f"Python version: {sys.version}")

## Initialize PySpark and Load Data

Setting up PySpark session optimized for feature engineering operations.

In [None]:
from pyspark.sql import SparkSession
import json

# Initialize PySpark Session for Feature Engineering
def create_spark_session():
    """
    Create Spark session optimized for feature engineering operations
    """
    spark = SparkSession.builder \
        .appName("FoodRecommendationFeatureEngineering") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.driver.memory", "4g") \
        .config("spark.driver.maxResultSize", "2g") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .config("spark.sql.adaptive.skewJoin.enabled", "true") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("WARN")
    return spark

# Create Spark session
spark = create_spark_session()
print(f"Spark session created successfully!")
print(f"Spark version: {spark.version}")
print(f"Available cores: {spark.sparkContext.defaultParallelism}")
print(f"Spark UI available at: http://localhost:4040")

# Load data using PySpark
try:
    print("\nLoading data with PySpark...")
    
    # Load original cleaned data
    df_spark = spark.read.csv('../data/cleaned_food_data_filtered.csv', 
                             header=True, inferSchema=True)
    print(f"Original cleaned data loaded! Rows: {df_spark.count()}, Columns: {len(df_spark.columns)}")
    
    # Load engineered features for comparison
    try:
        df_engineered = spark.read.csv('../data/engineered_features_filtered.csv', 
                                      header=True, inferSchema=True)
        print(f"Existing engineered features loaded! Rows: {df_engineered.count()}, Columns: {len(df_engineered.columns)}")
        
        # Show what features were already engineered
        original_cols = set(df_spark.columns)
        engineered_cols = set(df_engineered.columns)
        new_features = engineered_cols - original_cols
        print(f"Previously engineered features: {sorted(list(new_features))}")
        
    except Exception as e:
        print(f"Could not load existing engineered features: {e}")
        df_engineered = None
    
    # Load feature metadata if available
    try:
        with open('../data/feature_metadata_filtered.json', 'r') as f:
            metadata = json.load(f)
        print("Feature metadata loaded successfully!")
    except FileNotFoundError:
        print("Feature metadata not found, will create new metadata")
        metadata = {}
    
    # Show data schema
    print("\nData Schema:")
    df_spark.printSchema()
    
    print("\nSample data:")
    df_spark.show(3, truncate=False)
    
except Exception as e:
    print(f"Error loading data: {e}")
    print("Please ensure the data files exist and are accessible.")

## PySpark Text Feature Engineering

Engineering features from text fields like ingredients and categories using PySpark DataFrame operations.

In [None]:
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Text feature engineering for ingredients
def create_text_features_pyspark(df_spark):
    """
    Create comprehensive text features using PySpark operations
    """
    print("Creating text features using PySpark...")
    
    df_with_features = df_spark
    
    # 1. Ingredient Text Processing
    if 'ingredients_text' in df_spark.columns:
        print("Processing ingredients text...")
        
        # Clean and filter ingredients
        df_with_features = df_with_features.withColumn(
            'ingredients_filtered',
            F.regexp_replace(F.lower(F.col('ingredients_text')), r'[^a-zA-Z\s,]', '')
        )
        
        # Count ingredients
        df_with_features = df_with_features.withColumn(
            'ingredient_count',
            F.size(F.split(F.col('ingredients_filtered'), ','))
        )
        
        # Create binary features for common allergens and ingredients
        common_ingredients = [
            'gluten', 'milk', 'eggs', 'nuts', 'peanuts', 'soy', 
            'fish', 'shellfish', 'sesame', 'sugar', 'salt', 'oil'
        ]
        
        for ingredient in common_ingredients:
            df_with_features = df_with_features.withColumn(
                f'contains_{ingredient}',
                F.when(F.col('ingredients_filtered').contains(ingredient), 1).otherwise(0)
            )
        
        print(f"Created {len(common_ingredients)} ingredient flags")
    
    # 2. Category Text Processing
    if 'main_category' in df_spark.columns:
        print("Processing category information...")
        
        # Clean and process content text
        df_with_features = df_with_features.withColumn(
            'content_filtered',
            F.regexp_replace(F.lower(F.col('main_category')), r'[^a-zA-Z\s,]', '')
        )
        
        # Count categories
        df_with_features = df_with_features.withColumn(
            'category_count',
            F.size(F.split(F.col('content_filtered'), ','))
        )
    
    # 3. Text Length Features
    text_columns = ['ingredients_text', 'main_category', 'product_name']
    for col_name in text_columns:
        if col_name in df_spark.columns:
            df_with_features = df_with_features.withColumn(
                f'{col_name}_length',
                F.length(F.col(col_name))
            )
    
    return df_with_features

def create_tfidf_features_pyspark(df_spark, text_column='ingredients_filtered'):
    """
    Create TF-IDF features using PySpark ML
    """
    print(f"Creating TF-IDF features for {text_column}...")
    
    if text_column not in df_spark.columns:
        print(f"Column {text_column} not found")
        return df_spark
    
    # Filter out null values
    df_clean = df_spark.filter(F.col(text_column).isNotNull())
    
    # Create ML pipeline for TF-IDF
    tokenizer = Tokenizer(inputCol=text_column, outputCol="words")
    stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=1000)
    idf = IDF(inputCol="raw_features", outputCol="tfidf_features")
    
    # Create and fit pipeline
    pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, idf])
    model = pipeline.fit(df_clean)
    
    # Transform data
    df_tfidf = model.transform(df_clean)
    
    print(f"TF-IDF features created with {hashing_tf.getNumFeatures()} dimensions")
    
    return df_tfidf, model

# Apply text feature engineering
if 'df_spark' in locals():
    print("\n" + "="*50)
    print("TEXT FEATURE ENGINEERING")
    print("="*50)
    
    # Create basic text features
    df_with_text_features = create_text_features_pyspark(df_spark)
    
    print("\nText features created:")
    new_text_cols = [col for col in df_with_text_features.columns if col not in df_spark.columns]
    for col in new_text_cols:
        print(f"  - {col}")
    
    # Create TF-IDF features (optional, memory intensive)
    print("\nCreating TF-IDF features...")
    try:
        df_tfidf, tfidf_model = create_tfidf_features_pyspark(df_with_text_features)
        print("TF-IDF model created successfully!")
    except Exception as e:
        print(f"TF-IDF creation failed: {e}")
        df_tfidf = df_with_text_features
    
    # Show sample of new features
    print("\nSample of engineered features:")
    sample_cols = ['product_name', 'ingredient_count', 'contains_gluten', 'contains_milk', 'category_count']
    available_cols = [col for col in sample_cols if col in df_with_text_features.columns]
    if available_cols:
        df_with_text_features.select(available_cols).show(5)
else:
    print("Data not loaded. Please run the data loading cell first.")

## PySpark Nutritional Feature Engineering

Creating nutritional ratios, categories, and health scores using PySpark operations.

In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def create_nutritional_features_pyspark(df_spark):
    """
    Create nutritional features using PySpark operations
    """
    print("Creating nutritional features using PySpark...")
    
    df_nutrition = df_spark
    
    # Define nutritional columns to work with
    nutritional_cols = ['energy_100g', 'fat_100g', 'carbohydrates_100g', 'proteins_100g', 'sugars_100g', 'salt_100g']
    existing_nutrition_cols = [col for col in nutritional_cols if col in df_spark.columns]
    
    print(f"Found nutritional columns: {existing_nutrition_cols}")
    
    if len(existing_nutrition_cols) > 0:
        # 1. Create nutritional ratios
        if 'energy_100g' in df_spark.columns:
            # Macronutrient energy ratios
            if 'fat_100g' in df_spark.columns:
                df_nutrition = df_nutrition.withColumn(
                    'fat_energy_ratio',
                    F.when(F.col('energy_100g') > 0, 
                          (F.col('fat_100g') * 9) / F.col('energy_100g')).otherwise(0)
                )
            
            if 'carbohydrates_100g' in df_spark.columns:
                df_nutrition = df_nutrition.withColumn(
                    'carb_energy_ratio',
                    F.when(F.col('energy_100g') > 0, 
                          (F.col('carbohydrates_100g') * 4) / F.col('energy_100g')).otherwise(0)
                )
            
            if 'proteins_100g' in df_spark.columns:
                df_nutrition = df_nutrition.withColumn(
                    'protein_energy_ratio',
                    F.when(F.col('energy_100g') > 0, 
                          (F.col('proteins_100g') * 4) / F.col('energy_100g')).otherwise(0)
                )
        
        # 2. Create nutritional categories using Bucketizer
        if 'energy_100g' in df_spark.columns:
            # Energy categories
            energy_buckets = [0.0, 100.0, 300.0, 500.0, float('inf')]
            energy_bucketizer = Bucketizer(splits=energy_buckets, 
                                         inputCol="energy_100g", 
                                         outputCol="energy_category_idx")
            df_nutrition = energy_bucketizer.transform(df_nutrition)
            
            # Convert indices to labels
            energy_labels = ['Low', 'Medium', 'High', 'Very High']
            energy_label_udf = udf(lambda idx: energy_labels[int(idx)] if idx is not None and 0 <= idx < len(energy_labels) else 'Unknown', StringType())
            df_nutrition = df_nutrition.withColumn(
                'energy_category',
                energy_label_udf(F.col('energy_category_idx'))
            ).drop('energy_category_idx')
        
        # 3. Create health score based on multiple factors
        health_score_components = []
        
        # Lower fat content = higher score
        if 'fat_100g' in df_spark.columns:
            health_score_components.append(
                F.when(F.col('fat_100g') <= 3, 10)
                .when(F.col('fat_100g') <= 10, 7)
                .when(F.col('fat_100g') <= 20, 4)
                .otherwise(1)
            )
        
        # Lower sugar content = higher score
        if 'sugars_100g' in df_spark.columns:
            health_score_components.append(
                F.when(F.col('sugars_100g') <= 5, 10)
                .when(F.col('sugars_100g') <= 15, 7)
                .when(F.col('sugars_100g') <= 30, 4)
                .otherwise(1)
            )
        
        # Lower salt content = higher score
        if 'salt_100g' in df_spark.columns:
            health_score_components.append(
                F.when(F.col('salt_100g') <= 0.3, 10)
                .when(F.col('salt_100g') <= 1.0, 7)
                .when(F.col('salt_100g') <= 2.0, 4)
                .otherwise(1)
            )
        
        # Higher protein content = higher score
        if 'proteins_100g' in df_spark.columns:
            health_score_components.append(
                F.when(F.col('proteins_100g') >= 20, 10)
                .when(F.col('proteins_100g') >= 10, 7)
                .when(F.col('proteins_100g') >= 5, 4)
                .otherwise(1)
            )
        
        # Calculate overall health score
        if health_score_components:
            # Sum all components and normalize
            health_score_sum = health_score_components[0]
            for component in health_score_components[1:]:
                health_score_sum = health_score_sum + component
            
            df_nutrition = df_nutrition.withColumn(
                'healthy_score',
                health_score_sum / len(health_score_components)
            )
        
        # 4. Nutritional balance features
        if all(col in df_spark.columns for col in ['fat_100g', 'carbohydrates_100g', 'proteins_100g']):
            # Calculate total macronutrients
            df_nutrition = df_nutrition.withColumn(
                'total_macronutrients',
                F.col('fat_100g') + F.col('carbohydrates_100g') + F.col('proteins_100g')
            )
            
            # Calculate macronutrient balance (closer to 1 means more balanced)
            df_nutrition = df_nutrition.withColumn(
                'macronutrient_balance',
                F.when(F.col('total_macronutrients') > 0,
                      1.0 - F.abs(F.col('fat_100g') + F.col('carbohydrates_100g') + F.col('proteins_100g') - 
                                 (F.col('total_macronutrients') / 3)) / F.col('total_macronutrients')
                     ).otherwise(0)
            )
        
        print("Nutritional features created successfully!")
        
    else:
        print("No nutritional columns found in dataset")
    
    return df_nutrition

# Apply nutritional feature engineering
if 'df_with_text_features' in locals():
    print("\n" + "="*50)
    print("NUTRITIONAL FEATURE ENGINEERING")
    print("="*50)
    
    df_with_nutrition = create_nutritional_features_pyspark(df_with_text_features)
    
    # Show nutritional feature statistics
    nutrition_feature_cols = [
        'fat_energy_ratio', 'carb_energy_ratio', 'protein_energy_ratio',
        'energy_category', 'healthy_score', 'macronutrient_balance'
    ]
    
    available_nutrition_cols = [col for col in nutrition_feature_cols if col in df_with_nutrition.columns]
    
    if available_nutrition_cols:
        print("\nNutritional feature statistics:")
        df_with_nutrition.select(available_nutrition_cols).describe().show()
        
        print("\nSample nutritional features:")
        sample_nutrition_cols = ['product_name', 'energy_category', 'healthy_score'][:3]
        available_sample_cols = [col for col in sample_nutrition_cols if col in df_with_nutrition.columns]
        if available_sample_cols:
            df_with_nutrition.select(available_sample_cols).show(5)
else:
    print("Text features not available. Please run the text feature engineering cell first.")

## PySpark Feature Importance Analysis

Analyzing feature correlations, importance, and relationships using PySpark operations.

In [None]:
def analyze_features_pyspark(df_spark):
    """
    Analyze feature correlations and statistics using PySpark
    """
    print("Analyzing features using PySpark...")
    
    # Get numerical columns
    numerical_cols = []
    for col_name, dtype in df_spark.dtypes:
        if dtype in ['int', 'bigint', 'float', 'double'] and 'id' not in col_name.lower():
            numerical_cols.append(col_name)
    
    print(f"Found {len(numerical_cols)} numerical features")
    
    if len(numerical_cols) > 1:
        # Calculate basic statistics
        print("\nBasic statistics for numerical features:")
        df_spark.select(numerical_cols).describe().show()
        
        # Calculate correlations with energy_100g if available
        if 'energy_100g' in numerical_cols:
            print("\nCorrelations with energy_100g:")
            correlations = []
            
            for col in numerical_cols:
                if col != 'energy_100g':
                    try:
                        corr = df_spark.stat.corr('energy_100g', col)
                        correlations.append((col, corr))
                    except Exception as e:
                        print(f"Could not calculate correlation for {col}: {e}")
            
            # Sort by absolute correlation
            correlations.sort(key=lambda x: abs(x[1]), reverse=True)
            
            for col, corr in correlations[:10]:  # Top 10 correlations
                print(f"  {col}: {corr:.3f}")
        
        # Identify missing values
        print("\nMissing value analysis:")
        missing_counts = []
        total_count = df_spark.count()
        
        for col in df_spark.columns:
            null_count = df_spark.filter(F.col(col).isNull() | F.isnan(F.col(col))).count()
            missing_percent = (null_count / total_count) * 100
            if missing_percent > 0:
                missing_counts.append((col, null_count, missing_percent))
        
        missing_counts.sort(key=lambda x: x[2], reverse=True)
        
        for col, count, percent in missing_counts[:10]:  # Top 10 missing
            print(f"  {col}: {count} ({percent:.1f}%)")
    
    return numerical_cols

def create_categorical_features_pyspark(df_spark):
    """
    Create categorical features using PySpark operations
    """
    print("Creating categorical features...")
    
    df_categorical = df_spark
    
    # Encode nutriscore_grade if available
    if 'nutriscore_grade' in df_spark.columns:
        # Create numeric encoding for nutriscore
        nutriscore_map = {'a': 5, 'b': 4, 'c': 3, 'd': 2, 'e': 1}
        
        nutriscore_udf = udf(lambda grade: nutriscore_map.get(grade.lower() if grade else None, 0), IntegerType())
        df_categorical = df_categorical.withColumn(
            'nutriscore_numeric',
            nutriscore_udf(F.col('nutriscore_grade'))
        )
    
    # Create brand popularity features
    if 'brands' in df_spark.columns:
        # Count products per brand
        brand_counts = df_spark.groupBy('brands').count().withColumnRenamed('count', 'brand_product_count')
        df_categorical = df_categorical.join(brand_counts, on='brands', how='left')
        
        # Create brand popularity categories
        df_categorical = df_categorical.withColumn(
            'brand_popularity',
            F.when(F.col('brand_product_count') >= 100, 'High')
            .when(F.col('brand_product_count') >= 50, 'Medium')
            .when(F.col('brand_product_count') >= 10, 'Low')
            .otherwise('Very Low')
        )
    
    # Create category hierarchy features
    if 'main_category' in df_spark.columns:
        # Extract main category (first category)
        df_categorical = df_categorical.withColumn(
            'primary_category',
            F.split(F.col('main_category'), ',')[0]
        )
        
        # Count categories per product
        df_categorical = df_categorical.withColumn(
            'category_depth',
            F.size(F.split(F.col('main_category'), ','))
        )
    
    return df_categorical

# Apply feature analysis
if 'df_with_nutrition' in locals():
    print("\n" + "="*60)
    print("FEATURE IMPORTANCE AND ANALYSIS")
    print("="*60)
    
    # Analyze features
    numerical_features = analyze_features_pyspark(df_with_nutrition)
    
    # Create additional categorical features
    df_with_categorical = create_categorical_features_pyspark(df_with_nutrition)
    
    print("\nCategorical features created:")
    categorical_cols = [col for col in df_with_categorical.columns if col not in df_with_nutrition.columns]
    for col in categorical_cols:
        print(f"  - {col}")
    
    # Show sample of all engineered features
    print("\nSample of all engineered features:")
    all_new_cols = [col for col in df_with_categorical.columns if col not in df_spark.columns]
    sample_cols = ['product_name'] + all_new_cols[:10]  # Show first 10 new features
    available_sample_cols = [col for col in sample_cols if col in df_with_categorical.columns]
    
    if available_sample_cols:
        df_with_categorical.select(available_sample_cols).show(5, truncate=False)
    
    # Cache the final DataFrame for better performance
    df_final = df_with_categorical.cache()
    print(f"\nFinal dataset cached with {df_final.count()} rows and {len(df_final.columns)} columns")
    
else:
    print("Nutritional features not available. Please run the nutritional feature engineering cell first.")

## PySpark Feature Scaling and Dimensionality Reduction

Applying feature scaling, normalization, and PCA using PySpark ML pipelines.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler as SparkScaler, MinMaxScaler, Normalizer
import pyspark.sql.functions as F

def create_feature_scaling_pipeline_pyspark(df_spark, numerical_cols):
    """
    Create feature scaling pipeline using PySpark ML
    """
    print("Creating PySpark ML feature scaling pipeline...")
    
    if len(numerical_cols) < 2:
        print("Not enough numerical features for scaling pipeline")
        return df_spark, None
    
    # Filter out columns with null values for scaling
    valid_numerical_cols = []
    for col in numerical_cols:
        null_count = df_spark.filter(F.col(col).isNull() | F.isnan(F.col(col))).count()
        if null_count == 0:
            valid_numerical_cols.append(col)
    
    print(f"Using {len(valid_numerical_cols)} valid numerical columns for scaling")
    
    if len(valid_numerical_cols) < 2:
        print("Not enough valid numerical features")
        return df_spark, None
    
    # Create feature vector
    assembler = VectorAssembler(inputCols=valid_numerical_cols, outputCol="features_raw")
    
    # Create scaling transformers
    standard_scaler = SparkScaler(inputCol="features_raw", outputCol="features_scaled", 
                                 withStd=True, withMean=True)
    
    min_max_scaler = MinMaxScaler(inputCol="features_raw", outputCol="features_minmax")
    
    normalizer = Normalizer(inputCol="features_raw", outputCol="features_normalized", p=2.0)
    
    # Create pipeline
    pipeline = Pipeline(stages=[assembler, standard_scaler, min_max_scaler, normalizer])
    
    # Fit and transform
    model = pipeline.fit(df_spark)
    df_scaled = model.transform(df_spark)
    
    print("Feature scaling pipeline created successfully!")
    
    return df_scaled, model

def analyze_feature_distributions_pyspark(df_spark, sample_cols):
    """
    Analyze feature distributions using PySpark
    """
    print("Analyzing feature distributions...")
    
    available_cols = [col for col in sample_cols if col in df_spark.columns]
    
    if available_cols:
        # Calculate statistics
        stats_df = df_spark.select(available_cols).describe()
        print("\nFeature statistics:")
        stats_df.show()
        
        # Calculate quantiles
        print("\nFeature quantiles:")
        for col in available_cols[:5]:  # Limit to first 5 for performance
            try:
                quantiles = df_spark.stat.approxQuantile(col, [0.25, 0.5, 0.75], 0.05)
                print(f"  {col}: Q1={quantiles[0]:.3f}, Median={quantiles[1]:.3f}, Q3={quantiles[2]:.3f}")
            except Exception as e:
                print(f"  Could not calculate quantiles for {col}: {e}")

def create_feature_importance_analysis(df_spark):
    """
    Create feature importance analysis using correlation and variance
    """
    print("Creating feature importance analysis...")
    
    # Get numerical columns
    numerical_cols = [col for col, dtype in df_spark.dtypes 
                     if dtype in ['int', 'bigint', 'float', 'double'] and 'id' not in col.lower()]
    
    if len(numerical_cols) > 1:
        # Calculate variance for each feature
        print("\nFeature variance analysis:")
        variances = []
        
        for col in numerical_cols[:10]:  # Limit for performance
            try:
                variance = df_spark.select(F.variance(col).alias('variance')).collect()[0]['variance']
                if variance is not None:
                    variances.append((col, variance))
            except Exception as e:
                print(f"Could not calculate variance for {col}: {e}")
        
        # Sort by variance
        variances.sort(key=lambda x: x[1], reverse=True)
        
        print("Top features by variance:")
        for col, var in variances[:10]:
            print(f"  {col}: {var:.3f}")
        
        return variances
    
    return []

# Apply feature scaling and analysis
if 'df_final' in locals():
    print("\n" + "="*60)
    print("FEATURE SCALING AND ANALYSIS")
    print("="*60)
    
    # Create scaling pipeline
    df_scaled, scaling_model = create_feature_scaling_pipeline_pyspark(df_final, numerical_features)
    
    if df_scaled is not None:
        print("\nScaling pipeline applied successfully!")
        
        # Analyze feature distributions
        sample_analysis_cols = numerical_features[:5] if numerical_features else []
        if sample_analysis_cols:
            analyze_feature_distributions_pyspark(df_final, sample_analysis_cols)
        
        # Feature importance analysis
        feature_importance = create_feature_importance_analysis(df_final)
        
        # Show sample of scaled features
        print("\nSample of original vs scaled features:")
        if len(numerical_features) >= 3:
            sample_cols = ['product_name'] + numerical_features[:3]
            available_sample_cols = [col for col in sample_cols if col in df_final.columns]
            if available_sample_cols:
                df_final.select(available_sample_cols).show(5)
        
        # Performance comparison
        print("\nPerformance metrics:")
        row_count = df_final.count()
        col_count = len(df_final.columns)
        print(f"  - Dataset size: {row_count:,} rows × {col_count} columns")
        print(f"  - Numerical features: {len(numerical_features)}")
        print(f"  - Memory usage optimized with PySpark caching")
    
else:
    print("Final dataset not available. Please run the previous feature engineering cells first.")

## Save Engineered Features and Models

Saving the engineered features, PySpark models, and metadata.

In [None]:
def save_engineered_features_pyspark():
    """
    Save engineered features and models using PySpark
    """
    print("Saving engineered features and models...")
    
    saved_items = []
    
    # Save the main engineered dataset
    if 'df_final' in locals():
        try:
            # Convert to Pandas for CSV output (smaller datasets)
            sample_size = 10000  # Limit for memory efficiency
            df_sample = df_final.limit(sample_size).toPandas()
            
            output_path = '../data/features_engineered_pyspark_dev.csv'
            df_sample.to_csv(output_path, index=False)
            
            saved_items.append(f"Engineered features sample -> {output_path}")
            print(f"Sample dataset saved: {df_sample.shape}")
                        
        except Exception as e:
            print(f"Error saving main dataset: {e}")
    
    # Save TF-IDF model
    if 'tfidf_model' in locals():
        try:
            tfidf_path = '../models/tfidf_model_pyspark'
            tfidf_model.write().overwrite().save(tfidf_path)
            saved_items.append(f"TF-IDF model -> {tfidf_path}")
        except Exception as e:
            print(f"Error saving TF-IDF model: {e}")
    
    # Save scaling model
    if 'scaling_model' in locals():
        try:
            scaling_path = '../models/scaling_model_pyspark'
            scaling_model.write().overwrite().save(scaling_path)
            saved_items.append(f"Scaling model -> {scaling_path}")
        except Exception as e:
            print(f"Error saving scaling model: {e}")
    
    # Save feature metadata
    try:
        feature_metadata = {
            'timestamp': datetime.now().isoformat(),
            'spark_version': spark.version,
            'original_columns': list(df_spark.columns) if 'df_spark' in locals() else [],
            'engineered_features': {
                'text_features': [
                    'ingredients_filtered', 'ingredient_count', 'content_filtered', 'category_count'
                ] + [f'contains_{ing}' for ing in ['gluten', 'milk', 'eggs', 'nuts', 'peanuts', 'soy', 'fish', 'shellfish', 'sesame', 'sugar', 'salt', 'oil']],
                'nutritional_features': [
                    'fat_energy_ratio', 'carb_energy_ratio', 'protein_energy_ratio',
                    'energy_category', 'healthy_score', 'macronutrient_balance'
                ],
                'categorical_features': [
                    'nutriscore_numeric', 'brand_popularity', 'primary_category', 'category_depth'
                ]
            },
            'feature_engineering_steps': [
                'Text processing and ingredient extraction',
                'Nutritional ratio calculations',
                'Health score computation',
                'Categorical feature encoding',
                'Feature scaling and normalization'
            ],
            'data_quality': {
                'total_rows': df_final.count() if 'df_final' in locals() else 0,
                'total_features': len(df_final.columns) if 'df_final' in locals() else 0,
                'numerical_features': len(numerical_features) if 'numerical_features' in locals() else 0
            }
        }
        
        metadata_path = '../data/feature_engineering_metadata_pyspark.json'
        with open(metadata_path, 'w') as f:
            json.dump(feature_metadata, f, indent=2)
        
        saved_items.append(f"Feature metadata -> {metadata_path}")
        
    except Exception as e:
        print(f"Error saving metadata: {e}")
    
    return saved_items

def create_feature_engineering_summary():
    """
    Create a comprehensive summary of the feature engineering process
    """
    print("\n" + "="*70)
    print("FEATURE ENGINEERING SUMMARY")
    print("="*70)
    
    if 'df_spark' in locals() and 'df_final' in locals():
        original_cols = len(df_spark.columns)
        final_cols = len(df_final.columns)
        new_features = final_cols - original_cols
        
        print(f"\n📊 Dataset Transformation:")
        print(f"   • Original columns: {original_cols}")
        print(f"   • Final columns: {final_cols}")
        print(f"   • New features created: {new_features}")
        print(f"   • Total rows processed: {df_final.count():,}")
        
        print(f"\n🔧 Feature Engineering Pipeline:")
        print(f"   ✅ Text feature extraction (ingredients, categories)")
        print(f"   ✅ Nutritional feature engineering (ratios, health scores)")
        print(f"   ✅ Categorical feature encoding")
        print(f"   ✅ Feature scaling and normalization")
        print(f"   ✅ TF-IDF text vectorization")
        
        print(f"\n⚡ PySpark Optimizations:")
        print(f"   • Distributed processing across {spark.sparkContext.defaultParallelism} cores")
        print(f"   • Memory-efficient caching and persistence")
        print(f"   • ML pipeline for reproducible transformations")
        print(f"   • Parquet format for optimized storage")
    
    return True

# Save all engineered features and models
if 'df_final' in locals():
    print("\n" + "="*60)
    print("SAVING ENGINEERED FEATURES")
    print("="*60)
    
    saved_items = save_engineered_features_pyspark()
    
    print(f"\nSuccessfully saved {len(saved_items)} items:")
    for item in saved_items:
        print(f"✓ {item}")
    
    # Create summary
    create_feature_engineering_summary()
    
    print(f"\n🎯 Next Steps:")
    print(f"   1. Use the engineered features in model training (03_model_development.ipynb)")
    print(f"   2. Load PySpark models for consistent preprocessing")
    print(f"   3. Evaluate feature importance in recommendation performance")
    
else:
    print("No features to save. Please run the feature engineering pipeline first.")

# Cleanup: Optionally stop Spark session
print(f"\n💡 Spark session is still running for interactive use.")
print(f"   Run 'spark.stop()' when finished to free resources.")
print(f"   Spark UI: http://localhost:4040")