# Yelp Rating Prediction - Complete PipelineThis notebook contains a complete machine learning pipeline for predicting Yelp ratings, adapted for Kaggle's environment.## Pipeline Stages:1. Data Loading & Preprocessing2. Feature Engineering3. Sentiment Analysis4. Feature Selection5. Model Training6. Output Generation/Inference

In [None]:
# Notebook Setup - Import Libraries
import os
import random
import json
import pickle
import logging
import sys
from typing import List, Dict, Tuple, Any, Callable
from itertools import combinations

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from transformers import pipeline, AutoTokenizer
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
from tqdm import tqdm

# Setup logging for notebook
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)

print("Libraries imported successfully!")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")


## ConfigurationSet up Kaggle-specific paths and hyperparameters.

In [None]:
# Configuration for Kaggle Environment
# File paths for input CSV files (Kaggle input directory)
DATA_DIR = "/kaggle/input/yelp-data"
INPUT_FILES = {
    "business": os.path.join(DATA_DIR, "yelp_business_data.csv"),
    "review": os.path.join(DATA_DIR, "yelp_review.csv"),
    "user": os.path.join(DATA_DIR, "yelp_user.csv"),
    "checkin": os.path.join(DATA_DIR, "yelp_checkin_data.csv"),
    "tip": os.path.join(DATA_DIR, "yelp_tip_data.csv")
}

# Output paths for processed data (Kaggle working directory)
OUTPUT_DIR = "/kaggle/working/processed"
OUTPUT_FILES = {
    "merged_data": os.path.join(OUTPUT_DIR, "merged_data.csv"),
    "featured_data": os.path.join(OUTPUT_DIR, "featured_data.csv"),
    "sentiment_data": os.path.join(OUTPUT_DIR, "sentiment_data.csv"),
    "final_model_data": os.path.join(OUTPUT_DIR, "final_model_data.csv")
}

FEATURED_DATA_PATH = OUTPUT_FILES["featured_data"]

# Model hyperparameters
LEARNING_RATE = 0.0001
BATCH_SIZE = 64
MAX_EPOCHS = 40

# Feature lists
CANDIDATE_FEATURES = [
    "user_average_stars",
    "business_average_stars",
    "user_review_count",
    "business_review_count",
    "time_yelping",
    "date_year",
    "total_elite_statuses",
    "elite_status",
    "normalized_sentiment_score"
]

EXPECTED_OPTIMAL_FEATURES = [
    "user_average_stars",
    "business_average_stars",
    "time_yelping",
    "elite_status",
    "normalized_sentiment_score"
]

# Random seed
SEED = 1

# Sentiment settings
MODEL_NAME = "distilbert-base-uncased-finetuned-sst-2-english"
MAX_TOKENS = 512
SENTIMENT_BATCH_SIZE = 64

# Device detection helper function (adapted for Kaggle - CUDA instead of MPS)
def get_device():
    """
    Detect the available device for PyTorch computations.
    
    Returns:
        str: 'cuda' if CUDA is available, otherwise 'cpu'
    """
    if torch.cuda.is_available():
        return "cuda"
    else:
        return "cpu"

# Create output directories
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs("/kaggle/working/models", exist_ok=True)
os.makedirs("/kaggle/working/outputs", exist_ok=True)

print("Configuration loaded!")
print(f"Input directory: {DATA_DIR}")
print(f"Output directory: {OUTPUT_DIR}")
print(f"Device: {get_device()}")


## Utility Functions

In [None]:
import random
import numpy as np
import pandas as pd
import torch
import logging
from typing import List

logger = logging.getLogger(__name__)


def parse_elite_years(elite_str: str) -> List[int]:
    """
    Parse elite years string into a list of integers.

    Handles empty strings, NaN values, and comma/pipe-separated years.

    Args:
        elite_str: String containing elite years, e.g., "2018,2019,2020" or "2018|2019"

    Returns:
        List of integers representing elite years, or empty list for invalid input
    """
    if pd.isna(elite_str) or elite_str == "":
        return []

    # Replace pipe with comma for consistent splitting
    elite_str = elite_str.replace('|', ',')

    # Split by comma and convert to integers, filtering out empty strings
    years = []
    for year_str in elite_str.split(','):
        year_str = year_str.strip()
        if year_str:
            try:
                years.append(int(year_str))
            except ValueError:
                # Skip invalid year strings
                continue

    return years


def count_elite_statuses(elite_str: str, review_year: int) -> int:
    """
    Count the number of elite statuses up to and including the review year.

    Args:
        elite_str: String containing elite years
        review_year: The year of the review

    Returns:
        Number of elite years <= review_year
    """
    elite_years = parse_elite_years(elite_str)
    return sum(1 for year in elite_years if year <= review_year)


def check_elite_status(elite_str: str, review_year: int) -> int:
    """
    Check if the user was elite in the review year or the previous year.

    Args:
        elite_str: String containing elite years
        review_year: The year of the review

    Returns:
        1 if elite in review_year or (review_year - 1), 0 otherwise
    """
    elite_years = parse_elite_years(elite_str)
    return 1 if review_year in elite_years or (review_year - 1) in elite_years else 0
def smart_truncate_text(text: str, tokenizer, max_tokens: int = 500) -> str:
    """
    Tokenize text, keep first 250 + last 250 tokens if over max_tokens, convert back to string.
    """
    tokens = tokenizer.encode(text, add_special_tokens=False)
    if len(tokens) <= max_tokens:
        return text
    # Keep first 250 and last 250
    first_part = tokens[:250]
    last_part = tokens[-250:]
    truncated_tokens = first_part + last_part
    return tokenizer.decode(truncated_tokens)


def set_seed(seed: int = 1) -> None:
    """
    Set random seed for reproducibility.
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)


def verify_gpu_support() -> bool:
    """
    Check CUDA GPU support availability.
    """
    available = torch.cuda.is_available()
    status = "available" if available else "not available"
    logger.info(f"CUDA GPU support is {status}.")
    return available

print("Utility functions defined!")

## Data Loading Functions

In [None]:
import pandas as pd
import os
from typing import List


def load_business_data(filepath: str) -> pd.DataFrame:
    """
    Load business data from CSV file with appropriate dtypes.

    Args:
        filepath: Path to the business CSV file

    Returns:
        DataFrame containing business data

    Raises:
        FileNotFoundError: If the file does not exist
        ValueError: If required columns are missing
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"Business data file not found: {filepath}")

    # Define dtypes for business data
    dtypes = {
        'business_id': str,
        'name': str,
        'address': str,
        'city': str,
        'state': str,
        'postal_code': str,
        'latitude': float,
        'longitude': float,
        'stars': float,
        'review_count': int,
        'is_open': int
    }

    # Load the data
    df = pd.read_csv(filepath, dtype=dtypes, low_memory=False)

    # Check for required columns
    required_columns = ['business_id', 'name', 'stars', 'review_count']
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        raise ValueError(f"Missing required columns in business data: {missing_columns}")

    return df


def load_review_data(filepath: str) -> pd.DataFrame:
    """
    Load review data from CSV file with appropriate dtypes.

    Args:
        filepath: Path to the review CSV file

    Returns:
        DataFrame containing review data

    Raises:
        FileNotFoundError: If the file does not exist
        ValueError: If required columns are missing
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"Review data file not found: {filepath}")

    # Define dtypes for review data
    dtypes = {
        'review_id': str,
        'user_id': str,
        'business_id': str,
        'stars': int,
        'useful': int,
        'funny': int,
        'cool': int,
        'text': str,
        'date': str
    }

    # Load the data
    df = pd.read_csv(filepath, dtype=dtypes, low_memory=False)

    # Check for required columns
    required_columns = ['review_id', 'user_id', 'business_id', 'stars', 'text', 'date']
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        raise ValueError(f"Missing required columns in review data: {missing_columns}")

    return df


def load_user_data(filepath: str) -> pd.DataFrame:
    """
    Load user data from CSV file with appropriate dtypes.

    Args:
        filepath: Path to the user CSV file

    Returns:
        DataFrame containing user data

    Raises:
        FileNotFoundError: If the file does not exist
        ValueError: If required columns are missing
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"User data file not found: {filepath}")

    # Define dtypes for user data
    dtypes = {
        'user_id': str,
        'name': str,
        'review_count': int,
        'yelping_since': str,
        'useful': int,
        'funny': int,
        'cool': int,
        'elite': str,
        'friends': str,
        'fans': int,
        'average_stars': float,
        'compliment_hot': int,
        'compliment_more': int,
        'compliment_profile': int,
        'compliment_cute': int,
        'compliment_list': int,
        'compliment_note': int,
        'compliment_plain': int,
        'compliment_cool': int,
        'compliment_funny': int,
        'compliment_writer': int,
        'compliment_photos': int
    }

    # Load the data
    df = pd.read_csv(filepath, dtype=dtypes, low_memory=False)

    # Check for required columns
    required_columns = ['user_id', 'name', 'review_count', 'yelping_since', 'average_stars']
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        raise ValueError(f"Missing required columns in user data: {missing_columns}")

    return df

print("Data loading functions defined!")

## Preprocessing Functions

In [None]:
import pandas as pd
import os
from typing import Tuple



def rename_columns(user_df: pd.DataFrame, business_df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Rename columns in user and business DataFrames to avoid naming conflicts.

    Args:
        user_df: DataFrame containing user data
        business_df: DataFrame containing business data

    Returns:
        Tuple of (renamed_user_df, renamed_business_df)
    """
    # Rename user columns
    user_renames = {
        'useful': 'total_useful',
        'funny': 'total_funny',
        'cool': 'total_cool',
        'review_count': 'user_review_count',
        'name': 'user_name',
        'average_stars': 'user_average_stars'
    }

    # Rename business columns
    business_renames = {
        'stars': 'business_average_stars',
        'review_count': 'business_review_count',
        'name': 'business_name'
    }

    # Apply renames
    renamed_user_df = user_df.rename(columns=user_renames)
    renamed_business_df = business_df.rename(columns=business_renames)

    return renamed_user_df, renamed_business_df


def convert_date_columns(review_df: pd.DataFrame, user_df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Convert date columns to datetime dtype.

    Args:
        review_df: DataFrame containing review data
        user_df: DataFrame containing user data

    Returns:
        Tuple of (converted_review_df, converted_user_df)
    """
    # Convert 'date' column in review_df to datetime
    converted_review_df = review_df.copy()
    converted_review_df['date'] = pd.to_datetime(converted_review_df['date'])

    # Convert 'yelping_since' column in user_df to datetime
    converted_user_df = user_df.copy()
    converted_user_df['yelping_since'] = pd.to_datetime(converted_user_df['yelping_since'])

    return converted_review_df, converted_user_df
def merge_datasets(review_df: pd.DataFrame, user_df: pd.DataFrame, business_df: pd.DataFrame) -> pd.DataFrame:
    """
    Merge review, user, and business DataFrames using inner joins.

    Args:
        review_df: DataFrame containing review data
        user_df: DataFrame containing user data
        business_df: DataFrame containing business data

    Returns:
        Merged DataFrame with all three sources combined
    """
    # Inner join review -> user on 'user_id'
    merged = review_df.merge(user_df, on='user_id', how='inner')
    # Then result -> business on 'business_id'
    merged = merged.merge(business_df, on='business_id', how='inner')
    return merged


def clean_merged_data(merged_df: pd.DataFrame) -> pd.DataFrame:
    """
    Clean merged DataFrame by removing rows with missing values in critical columns.

    Args:
        merged_df: Merged DataFrame from merge_datasets

    Returns:
        Cleaned DataFrame with no missing values in critical columns
    """
    # Drop rows with missing values in specified columns
    cleaned = merged_df.dropna(subset=['stars', 'text', 'business_average_stars', 'user_average_stars', 'user_review_count'])
    return cleaned


def preprocess_pipeline() -> pd.DataFrame:
    """
    Complete preprocessing pipeline: load, rename, convert dates, merge, clean, and save.

    Returns:
        Final preprocessed DataFrame
    """
    # Load all three datasets
    review_df = load_review_data(INPUT_FILES["review"])
    user_df = load_user_data(INPUT_FILES["user"])
    business_df = load_business_data(INPUT_FILES["business"])

    # Rename columns
    user_df, business_df = rename_columns(user_df, business_df)

    # Convert date columns
    review_df, user_df = convert_date_columns(review_df, user_df)

    # Merge datasets
    merged_df = merge_datasets(review_df, user_df, business_df)

    # Clean merged data
    cleaned_df = clean_merged_data(merged_df)

    # Ensure output directory exists
    output_dir = os.path.dirname(OUTPUT_FILES["merged_data"])
    os.makedirs(output_dir, exist_ok=True)

    # Save to CSV
    cleaned_df.to_csv(OUTPUT_FILES["merged_data"], index=False)

    return cleaned_df

print("Preprocessing functions defined!")

## Feature Engineering Functions

In [None]:
import pandas as pd



def engineer_time_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Engineer time-based features from the DataFrame.

    Calculates time_yelping as the difference between date and yelping_since
    in weeks, and extracts date_year from the date column.

    Args:
        df: Input DataFrame with 'date' and 'yelping_since' columns

    Returns:
        DataFrame with added 'time_yelping' and 'date_year' columns
    """
    df = df.copy()

    # Convert date columns to datetime if they are strings
    df['date'] = pd.to_datetime(df['date'])
    df['yelping_since'] = pd.to_datetime(df['yelping_since'])

    # Calculate time_yelping in weeks
    df['time_yelping'] = (df['date'] - df['yelping_since']).dt.total_seconds() / (7 * 24 * 3600)

    # Extract year from date
    df['date_year'] = df['date'].dt.year

    return df


def engineer_elite_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Engineer elite status features from the DataFrame.

    Creates 'total_elite_statuses' by counting elite years up to the review year,
    and 'elite_status' by checking if the user was elite in the review year or previous year.

    Args:
        df: Input DataFrame with 'elite' and 'date_year' columns

    Returns:
        DataFrame with added 'total_elite_statuses' and 'elite_status' columns
    """
    df = df.copy()

    # Create total_elite_statuses using count_elite_statuses
    df['total_elite_statuses'] = df.apply(
        lambda row: count_elite_statuses(row['elite'], row['date_year']),
        axis=1
    )

    # Create elite_status using check_elite_status
    df['elite_status'] = df.apply(
        lambda row: check_elite_status(row['elite'], row['date_year']),
        axis=1
    )

    return df


def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame:
    """
    Handle missing values in the DataFrame.

    Fills 'time_yelping' with the median value, and 'total_elite_statuses'
    and 'elite_status' with 0.

    Args:
        df: Input DataFrame

    Returns:
        DataFrame with missing values handled
    """
    df = df.copy()

    df['time_yelping'] = df['time_yelping'].fillna(df['time_yelping'].median())
    df['total_elite_statuses'] = df['total_elite_statuses'].fillna(0)
    df['elite_status'] = df['elite_status'].fillna(0)

    return df


def feature_engineering_pipeline(df: pd.DataFrame) -> pd.DataFrame:
    """
    Complete feature engineering pipeline.

    Applies time feature engineering, elite feature engineering, handles missing values,
    saves the processed data to CSV, and returns the DataFrame.

    Args:
        df: Input DataFrame

    Returns:
        Processed DataFrame with engineered features
    """
    df = engineer_time_features(df)
    df = engineer_elite_features(df)
    df = handle_missing_values(df)

    df.to_csv(FEATURED_DATA_PATH, index=False)

    return df

print("Feature engineering functions defined!")

## Sentiment Analysis Functions**Note:** The `smart_truncate_text` function is included inline here.

In [None]:
def smart_truncate_text(text: str, tokenizer, max_tokens: int = 500) -> str:
    """
    Tokenize text, keep first 250 + last 250 tokens if over max_tokens, convert back to string.
    """
    tokens = tokenizer.encode(text, add_special_tokens=False)
    if len(tokens) <= max_tokens:
        return text
    # Keep first 250 and last 250
    first_part = tokens[:250]
    last_part = tokens[-250:]
    truncated_tokens = first_part + last_part
    return tokenizer.decode(truncated_tokens)

from transformers import pipeline, AutoTokenizer
import torch
from typing import List, Dict
import pandas as pd
from tqdm import tqdm
import os
def initialize_sentiment_pipeline(device: str = None):
    """
    Initialize sentiment analysis pipeline with device detection.
    
    Args:
        device: Device to use ('cuda' or 'cpu'). If None, auto-detects.
    
    Returns:
        Hugging Face pipeline for sentiment analysis
    """
    # Detect device if not provided
    if device is None:
        device = get_device()
    
    # Fallback to CPU if CUDA not available
    if device == "cuda" and not torch.cuda.is_available():
        device = "cpu"
    
    # Map device for transformers pipeline (use -1 for CPU, 0+ for CUDA)
    if device == "cuda":
        device_map = 0
    else:
        device_map = -1
    
    # Initialize pipeline
    sentiment_pipeline = pipeline(
        "sentiment-analysis",
        model=MODEL_NAME,
        device=device_map,
        truncation=False
    )
    
    return sentiment_pipeline



def process_sentiment_batch(texts: List[str], pipeline, batch_size: int = 64) -> List[Dict]:
    """
    Process batch of texts through sentiment analysis pipeline.

    Args:
        texts: List of text strings to analyze
        pipeline: Hugging Face sentiment analysis pipeline
        batch_size: Number of texts to process in each batch

    Returns:
        List of sentiment analysis results (dicts with 'label' and 'score')
    """
    results = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        batch_results = pipeline(batch)
        results.extend(batch_results)
    return results


def normalize_sentiment_scores(sentiment_results: List[Dict]) -> pd.Series:
    """
    Normalize sentiment scores to range [-1, 1].

    Args:
        sentiment_results: List of sentiment analysis results

    Returns:
        Pandas Series with normalized scores (-1 for negative, +1 for positive)
    """
    scores = []
    for result in sentiment_results:
        label = result['label']
        score = result['score']
        if label == 'NEGATIVE':
            scores.append(-score)
        elif label == 'POSITIVE':
            scores.append(score)
        else:
            # Handle unexpected labels (e.g., neutral) by setting to 0
            scores.append(0.0)
    return pd.Series(scores)


def sentiment_analysis_pipeline(df: pd.DataFrame, batch_size: int = 64) -> pd.DataFrame:
    """
    Complete sentiment analysis pipeline.

    Args:
        df: DataFrame containing review texts in 'text' column
        batch_size: Number of texts to process in each batch

    Returns:
        DataFrame with added sentiment columns
    """
    # Initialize sentiment pipeline
    sentiment_pipeline = initialize_sentiment_pipeline()

    # Load tokenizer for truncation
    tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")

    # Process in batches with tqdm progress bar
    results = []
    for i in tqdm(range(0, len(df), batch_size), desc="Processing sentiment analysis"):
        batch_texts = df['text'].iloc[i:i + batch_size].tolist()
        # Apply smart truncation to each text
        truncated_texts = [smart_truncate_text(text, tokenizer, max_tokens=500) for text in batch_texts]
        batch_results = process_sentiment_batch(truncated_texts, sentiment_pipeline, batch_size)
        results.extend(batch_results)

    # Normalize sentiment scores
    normalized_scores = normalize_sentiment_scores(results)

    # Add columns
    df['sentiment_label'] = [r['label'] for r in results]
    df['sentiment_score_raw'] = [r['score'] for r in results]
    df['normalized_sentiment_score'] = normalized_scores

    # Save to CSV
    output_path = OUTPUT_FILES["sentiment_data"]
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    df.to_csv(output_path, index=False)

    return df

print("Sentiment analysis functions defined!")

## Feature Selection Functions

In [None]:
import pandas as pd
import json
import os
import logging
from typing import List, Tuple
from itertools import combinations
from tqdm import tqdm
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error


logger = logging.getLogger(__name__)


def prepare_feature_data(df: pd.DataFrame, candidate_features: List[str]) -> Tuple[pd.DataFrame, pd.Series]:
    """
    Prepare data for feature selection.

    Args:
        df: Input DataFrame containing all features and target
        candidate_features: List of feature column names to consider

    Returns:
        Tuple of (X, y) where X is features DataFrame and y is target Series
    """
    # Select candidate features plus target column
    selected_cols = candidate_features + ['stars']
    subset_df = df[selected_cols].copy()

    # Remove rows with missing values in selected columns
    subset_df = subset_df.dropna()

    # Separate features and target
    X = subset_df[candidate_features]
    y = subset_df['stars']

    return X, y


def run_best_subset_selection(X: pd.DataFrame, y: pd.Series) -> List[str]:
    """
    Run best subset feature selection using exhaustive search over different numbers of features.

    Args:
        X: Feature DataFrame
        y: Target Series

    Returns:
        List of selected feature names
    """
    # Split data into train and validation
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    best_overall_mse = float('inf')
    best_k = None
    best_features = None

    max_k = min(10, len(X.columns))

    for k in tqdm(range(1, max_k + 1), desc="Evaluating k"):
        # Get all combinations of k features
        feature_combos = list(combinations(X.columns, k))

        best_mse_for_k = float('inf')
        best_combo_for_k = None

        for combo in tqdm(feature_combos, desc=f"Evaluating combinations for k={k}"):
            combo = list(combo)
            # Select features
            X_train_combo = X_train[combo]
            X_val_combo = X_val[combo]

            # Fit model
            model = RandomForestRegressor(n_estimators=100, random_state=1, n_jobs=-1)
            model.fit(X_train_combo, y_train)

            # Predict and compute MSE
            y_pred = model.predict(X_val_combo)
            mse = mean_squared_error(y_val, y_pred)

            if mse < best_mse_for_k:
                best_mse_for_k = mse
                best_combo_for_k = combo

        # Compare across k
        if best_mse_for_k < best_overall_mse:
            best_overall_mse = best_mse_for_k
            best_k = k
            best_features = best_combo_for_k

    logger.info(f"Best k: {best_k}")
    logger.info(f"Best feature set: {best_features}")
    logger.info(f"Best MSE: {best_overall_mse}")

    return best_features


def feature_selection_pipeline(df: pd.DataFrame) -> Tuple[pd.DataFrame, List[str]]:
    """
    Complete feature selection pipeline.

    Args:
        df: Input DataFrame with all features and target

    Returns:
        Tuple of (final DataFrame with optimal features + target, list of optimal features)
    """
    # Get candidate features from config
    candidate_features = CANDIDATE_FEATURES

    # Prepare feature data
    X, y = prepare_feature_data(df, candidate_features)

    # Run best subset selection
    optimal_features = run_best_subset_selection(X, y)

    # Save optimal features to JSON
    optimal_features_path = os.path.join(OUTPUT_DIR, "optimal_features.json")
    with open(optimal_features_path, 'w') as f:
        json.dump(optimal_features, f, indent=2)

    # Create final dataset with optimal features + target
    final_cols = optimal_features + ['stars']
    final_df = df[final_cols].copy()

    # Remove any remaining missing values
    final_df = final_df.dropna()

    # Save final dataset
    final_data_path = OUTPUT_FILES["final_model_data"]
    os.makedirs(os.path.dirname(final_data_path), exist_ok=True)
    final_df.to_csv(final_data_path, index=False)

    # Return final DataFrame and feature list
    return final_df, optimal_features

print("Feature selection functions defined!")

## Model Definition

In [None]:
import torch
import torch.nn as nn
import pytorch_lightning as pl
from typing import Dict, Any


class YelpRatingPredictor(pl.LightningModule):
    """
    PyTorch Lightning module for predicting Yelp ratings using a neural network.

    This model consists of a feedforward neural network with dropout and batch normalization
    layers, designed to predict star ratings based on input features.

    Attributes:
        network: Sequential neural network layers
        criterion: Mean squared error loss function
    """

    def __init__(self, input_size: int = 5, learning_rate: float = 0.0001) -> None:
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(input_size, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.5),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 1)
        )
        self.criterion = nn.MSELoss()
        self.save_hyperparameters()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass through the network.

        Args:
            x: Input tensor

        Returns:
            Output tensor predictions
        """
        return self.network(x)

    def training_step(self, batch: tuple, batch_idx: int) -> torch.Tensor:
        """
        Training step for one batch.

        Args:
            batch: Tuple of (features, targets)
            batch_idx: Batch index

        Returns:
            Loss tensor
        """
        x, y = batch
        preds = self(x)
        loss = self.criterion(preds, y)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch: tuple, batch_idx: int) -> torch.Tensor:
        """
        Validation step for one batch.

        Args:
            batch: Tuple of (features, targets)
            batch_idx: Batch index

        Returns:
            Loss tensor
        """
        x, y = batch
        preds = self(x)
        loss = self.criterion(preds, y)
        mae = torch.mean(torch.abs(preds - y))
        self.log('val_loss', loss)
        self.log('val_mae', mae)
        return loss

    def test_step(self, batch: tuple, batch_idx: int) -> Dict[str, torch.Tensor]:
        """
        Test step for one batch.

        Args:
            batch: Tuple of (features, targets)
            batch_idx: Batch index

        Returns:
            Dictionary with test loss and MAE
        """
        x, y = batch
        preds = self(x)
        loss = self.criterion(preds, y)
        mae = torch.mean(torch.abs(preds - y))
        return {'test_loss': loss, 'test_mae': mae}

    def configure_optimizers(self) -> Dict[str, Any]:
        """
        Configure optimizer and learning rate scheduler.

        Returns:
            Dictionary with optimizer and scheduler configuration
        """
        optimizer = torch.optim.RMSprop(self.parameters(), lr=self.hparams.learning_rate)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, factor=0.5, patience=3
        )
        return {'optimizer': optimizer, 'lr_scheduler': {'scheduler': scheduler, 'monitor': 'val_loss'}}

print("Model class defined!")

## Training Functions

In [None]:
import pandas as pd
import json
import torch
import os
import pickle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from typing import List, Tuple, Dict, Any

from torch.utils.data import DataLoader, TensorDataset
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint


def stratify_and_split(df: pd.DataFrame, target_size: int = 130000) -> pd.DataFrame:
    """
    Stratify the dataset by 'stars' and downsample to equal samples per class.

    Args:
        df: Input DataFrame containing 'stars' column
        target_size: Total target size for the stratified dataset

    Returns:
        Stratified DataFrame with equal samples per class
    """
    # Group by 'stars' and downsample each group
    samples_per_class = target_size // 5  # 5 classes (1-5 stars)

    stratified_df = df.groupby('stars', group_keys=False).apply(
        lambda x: x.sample(n=min(len(x), samples_per_class), random_state=1)
    ).reset_index(drop=True)

    return stratified_df


def prepare_train_test_data(df: pd.DataFrame, features: List[str], test_size: float = 0.2) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, MinMaxScaler]:
    """
    Prepare train/test data with stratification, normalization, and PyTorch tensor conversion.

    Args:
        df: Input DataFrame
        features: List of feature column names
        test_size: Fraction of data to use for testing

    Returns:
        Tuple of (X_train, X_test, y_train, y_test, scaler)
    """
    # Prepare features and target
    X = df[features]
    y = df['stars']

    # Split into train/test with stratification
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, stratify=y, random_state=1
    )

    # Normalize features using MinMaxScaler
    scaler = MinMaxScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Convert to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train_scaled)
    X_test_tensor = torch.FloatTensor(X_test_scaled)
    y_train_tensor = torch.FloatTensor(y_train.values)
    y_test_tensor = torch.FloatTensor(y_test.values)

    return X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor, scaler
def create_dataloaders(X_train, y_train, X_val, y_val, batch_size: int = 64) -> Tuple[DataLoader, DataLoader]:
    """
    Create DataLoaders for training and validation datasets.

    Args:
        X_train: Training features tensor
        y_train: Training labels tensor
        X_val: Validation features tensor
        y_val: Validation labels tensor
        batch_size: Batch size for DataLoaders

    Returns:
        Tuple of (train_loader, val_loader)
    """
    train_dataset = TensorDataset(X_train, y_train)
    val_dataset = TensorDataset(X_val, y_val)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, val_loader


def train_model(model, train_loader, val_loader, max_epochs: int = 40) -> pl.Trainer:
    """
    Train the model using PyTorch Lightning.

    Args:
        model: PyTorch Lightning model to train
        train_loader: Training DataLoader
        val_loader: Validation DataLoader
        max_epochs: Maximum number of epochs

    Returns:
        Trained PyTorch Lightning Trainer
    """
    # Detect device
    if torch.cuda.is_available():
        accelerator = 'gpu'
    else:
        accelerator = 'cpu'

    # Configure callbacks
    early_stopping = EarlyStopping(monitor='val_loss', patience=5)

    # Configure trainer
    trainer = pl.Trainer(
        accelerator=accelerator,
        max_epochs=max_epochs,
        callbacks=[early_stopping]
    )

    # Train model
    trainer.fit(model, train_loader, val_loader)

    # Save model
    os.makedirs('/kaggle/working/models', exist_ok=True)
    torch.save(model.state_dict(), os.path.join('models', 'best_model.pt'))

    return trainer


def evaluate_model(model, X_test, y_test) -> Dict[str, float]:
    """
    Evaluate the model on test data and return metrics.

    Args:
        model: Trained PyTorch model
        X_test: Test features tensor
        y_test: Test labels tensor

    Returns:
        Dictionary with MSE, MAE, and R² metrics
    """
    model.eval()
    with torch.no_grad():
        predictions = model(X_test)
        mse = mean_squared_error(y_test.cpu().numpy(), predictions.cpu().numpy())
        mae = mean_absolute_error(y_test.cpu().numpy(), predictions.cpu().numpy())
        r2 = r2_score(y_test.cpu().numpy(), predictions.cpu().numpy())
    return {'mse': mse, 'mae': mae, 'r2': r2}


def training_pipeline() -> Dict[str, Any]:
    """
    Complete training pipeline: load data, train model, evaluate, and save artifacts.

    Returns:
        Dictionary with results including metrics and file paths
    """
    # Load final model data and optimal features
    df = pd.read_csv(OUTPUT_FILES["final_model_data"])
    with open(os.path.join(OUTPUT_DIR, "optimal_features.json"), 'r') as f:
        features = json.load(f)

    # Stratify data
    df_stratified = stratify_and_split(df)

    # Prepare train/test splits
    X_train, X_test, y_train, y_test, scaler = prepare_train_test_data(df_stratified, features)

    # Split train into train/val for training
    X_train_split, X_val, y_train_split, y_val = train_test_split(
        X_train, y_train, test_size=0.2, random_state=1
    )

    # Create DataLoaders
    train_loader, val_loader = create_dataloaders(
        X_train_split, y_train_split, X_val, y_val, batch_size=BATCH_SIZE
    )

    # Initialize model
    input_size = len(features)
    model = YelpRatingPredictor(input_size=input_size, learning_rate=LEARNING_RATE)

    # Train model
    trainer = train_model(model, train_loader, val_loader, max_epochs=MAX_EPOCHS)

    # Evaluate on test set
    metrics = evaluate_model(model, X_test, y_test)

    # Save scaler
    os.makedirs('/kaggle/working/models', exist_ok=True)
    with open('/kaggle/working/models/scaler.pkl', 'wb') as f:
        pickle.dump(scaler, f)

    # Save metrics
    os.makedirs('/kaggle/working/outputs', exist_ok=True)
    with open('/kaggle/working/outputs/metrics.json', 'w') as f:
        json.dump(metrics, f)

    # Return results
    return {
        'metrics': metrics,
        'model_path': '/kaggle/working/models/best_model.pt',
        'scaler_path': '/kaggle/working/models/scaler.pkl',
        'metrics_path': '/kaggle/working/outputs/metrics.json'
    }

print("Training functions defined!")

## Main Pipeline ExecutionExecute the complete pipeline stage by stage.

### Stage 1: Data Loading & Preprocessing

In [None]:
# Stage 1: Data Loading & Preprocessinglogger.info("Starting stage: Data Loading & Preprocessing")set_seed(SEED)try:    df_merged = preprocess_pipeline()    logger.info(f"Completed stage: Data Loading & Preprocessing")    logger.info(f"Preprocessed data shape: {df_merged.shape}")    print(f"\n✓ Stage 1 Complete: Loaded and preprocessed {len(df_merged):,} reviews")except Exception as e:    logger.error(f"Error in stage 'Data Loading & Preprocessing': {e}")    raise

### Stage 2: Feature Engineering

In [None]:
# Stage 2: Feature Engineeringlogger.info("Starting stage: Feature Engineering")try:    df_featured = feature_engineering_pipeline(df_merged)    logger.info(f"Completed stage: Feature Engineering")    logger.info(f"Featured data shape: {df_featured.shape}")    print(f"\n✓ Stage 2 Complete: Engineered features for {len(df_featured):,} reviews")except Exception as e:    logger.error(f"Error in stage 'Feature Engineering': {e}")    raise

### Stage 3: Sentiment Analysis**Note:** This stage may take a while depending on dataset size and available GPU.

In [None]:
# Stage 3: Sentiment Analysislogger.info("Starting stage: Sentiment Analysis")print(f"Processing sentiment for {len(df_featured):,} reviews...")print(f"Using device: {get_device()}")try:    df_sentiment = sentiment_analysis_pipeline(df_featured, batch_size=SENTIMENT_BATCH_SIZE)    logger.info(f"Completed stage: Sentiment Analysis")    logger.info(f"Sentiment data shape: {df_sentiment.shape}")    print(f"\n✓ Stage 3 Complete: Analyzed sentiment for {len(df_sentiment):,} reviews")except Exception as e:    logger.error(f"Error in stage 'Sentiment Analysis': {e}")    raise

### Stage 4: Feature Selection**Note:** This stage performs exhaustive search and may take some time.

In [None]:
# Stage 4: Feature Selectionlogger.info("Starting stage: Feature Selection")try:    final_df, optimal_features = feature_selection_pipeline(df_sentiment)    logger.info(f"Completed stage: Feature Selection")    logger.info(f"Selected {len(optimal_features)} optimal features: {optimal_features}")    logger.info(f"Final model data shape: {final_df.shape}")    print(f"\n✓ Stage 4 Complete: Selected {len(optimal_features)} optimal features")    print(f"  Features: {optimal_features}")except Exception as e:    logger.error(f"Error in stage 'Feature Selection': {e}")    raise

### Stage 5: Model Training

In [None]:
# Stage 5: Model Traininglogger.info("Starting stage: Model Training")print(f"Training model with {len(optimal_features)} features...")try:    training_results = training_pipeline()    logger.info(f"Completed stage: Model Training")    logger.info(f"Training completed. Metrics: {training_results['metrics']}")    print(f"\n✓ Stage 5 Complete: Model training finished")    print(f"  Test MSE: {training_results['metrics']['mse']:.4f}")    print(f"  Test MAE: {training_results['metrics']['mae']:.4f}")    print(f"  Test R²: {training_results['metrics']['r2']:.4f}")except Exception as e:    logger.error(f"Error in stage 'Model Training': {e}")    raise

### Stage 6: Output Generation & Inference Demo

In [None]:
# Stage 6: Output Generation & Inference Demologger.info("Starting stage: Output Generation/Inference")# File pathsmodel_path = '/kaggle/working/models/best_model.pt'scaler_path = '/kaggle/working/models/scaler.pkl'features_path = os.path.join(OUTPUT_DIR, "optimal_features.json")output_path = '/kaggle/working/outputs/predictions.json'# Check if required files existfor path in [model_path, scaler_path, features_path]:    if not os.path.exists(path):        raise FileNotFoundError(f"Required file not found: {path}")# Load model, scaler, and featureswith open(features_path, 'r') as f:    optimal_features = json.load(f)input_size = len(optimal_features)model = YelpRatingPredictor(input_size=input_size)model.load_state_dict(torch.load(model_path, map_location='cpu'))model.eval()with open(scaler_path, 'rb') as f:    scaler = pickle.load(f)# Create example data for inferenceexample_data = {    'user_average_stars': [4.2, 3.8, 4.5],    'business_average_stars': [4.0, 3.5, 4.8],    'time_yelping': [52.3, 24.1, 156.7],    'elite_status': [1, 0, 1],    'normalized_sentiment_score': [0.8, -0.3, 0.9]}# Filter to optimal featuresfiltered_data = {k: v for k, v in example_data.items() if k in optimal_features}df_example = pd.DataFrame(filtered_data)# Make predictionsscaled_data = scaler.transform(df_example.values)input_tensor = torch.FloatTensor(scaled_data)with torch.no_grad():    predictions = model(input_tensor).flatten().numpy()# Save predictionsos.makedirs('/kaggle/working/outputs', exist_ok=True)prediction_results = {    'predictions': predictions.tolist(),    'input_features': optimal_features,    'example_inputs': df_example.to_dict('records')}with open(output_path, 'w') as f:    json.dump(prediction_results, f, indent=2)logger.info(f"Inference completed. Predictions saved to {output_path}")print(f"\n✓ Stage 6 Complete: Inference demo finished")print(f"  Example predictions: {predictions}")print(f"\n📊 Final Results:")print(f"  Model saved to: {model_path}")print(f"  Metrics saved to: {training_results['metrics_path']}")print(f"  Predictions saved to: {output_path}")

## SummaryPipeline execution completed successfully! All outputs are saved in `/kaggle/working/`:- **Processed data**: `/kaggle/working/processed/`- **Trained model**: `/kaggle/working/models/best_model.pt`- **Scaler**: `/kaggle/working/models/scaler.pkl`- **Metrics**: `/kaggle/working/outputs/metrics.json`- **Predictions**: `/kaggle/working/outputs/predictions.json`

In [None]:
# Display final metricsprint("\n" + "="*60)print("FINAL MODEL METRICS")print("="*60)print(f"Mean Squared Error (MSE): {training_results['metrics']['mse']:.4f}")print(f"Mean Absolute Error (MAE): {training_results['metrics']['mae']:.4f}")print(f"R² Score: {training_results['metrics']['r2']:.4f}")print("="*60)# Display example predictionsprint("\n" + "="*60)print("EXAMPLE PREDICTIONS")print("="*60)for i, (pred, example) in enumerate(zip(predictions, df_example.to_dict('records')), 1):    print(f"\nExample {i}:")    for key, value in example.items():        print(f"  {key}: {value}")    print(f"  Predicted Rating: {pred:.2f} stars")print("="*60)