<a href="https://colab.research.google.com/github/parthasaratheeg/Analytics_Vidhya/blob/master/10M_Hybrid_Recommender_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
pip install pyspark



In [None]:
!apt-get install openjdk-11-jdk -y
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] += ":/usr/lib/jvm/java-11-openjdk-amd64/bin"


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  at-spi2-core fonts-dejavu-core fonts-dejavu-extra gsettings-desktop-schemas
  libatk-bridge2.0-0 libatk-wrapper-java libatk-wrapper-java-jni libatk1.0-0
  libatk1.0-data libatspi2.0-0 libxcomposite1 libxt-dev libxtst6 libxxf86dga1
  openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
  session-migration x11-utils
Suggested packages:
  libxt-doc openjdk-11-demo openjdk-11-source visualvm libnss-mdns
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  | fonts-wqy-zenhei fonts-indic mesa-utils
The following NEW packages will be installed:
  at-spi2-core fonts-dejavu-core fonts-dejavu-extra gsettings-desktop-schemas
  libatk-bridge2.0-0 libatk-wrapper-java libatk-wrapper-java-jni libatk1.0-0
  libatk1.0-data libatspi2.0-0 libxcomposite1 libxt-dev libxtst6 libxxf86dga1
  openjdk-11-jdk openjdk-11-jdk-headless openjdk-

In [None]:
spark = SparkSession.builder \
    .appName("HybridRecommendationSystemMovieLens1M") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()


NameError: name 'SparkSession' is not defined

print(spark)

In [None]:
print(spark)

NameError: name 'spark' is not defined

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.feature import StandardScaler, VectorAssembler, Bucketizer
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.pipeline import Pipeline
import numpy as np
from datetime import datetime
import os
import warnings
warnings.filterwarnings('ignore')

# Initialize Spark Session with optimized configurations
spark = SparkSession.builder \
    .appName("HybridRecommendationSystemMovieLens10M") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.sql.shuffle.partitions", "300") \
    .config("spark.default.parallelism", "300") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

class HybridRecommendationSystem:
    def __init__(self):
        """Initialize the recommendation system for MovieLens 10M"""
        self.results = {}
        self.dataset_name = "MovieLens 10M"
        self.feature_importance = {}

    def download_dataset(self):
        """Download and extract MovieLens 10M dataset"""
        try:
            import urllib.request
            import zipfile

            print(f"\n{'='*80}")
            print(f"DOWNLOADING MOVIELENS 10M DATASET")
            print(f"{'='*80}")

            dataset_url = "https://files.grouplens.org/datasets/movielens/ml-10m.zip"
            zip_path = "/tmp/ml-10m.zip"
            extract_path = "/tmp/ml-10M100K"

            if not os.path.exists(f"{extract_path}"):
                print("Downloading MovieLens 10M dataset (~63MB)...")
                urllib.request.urlretrieve(dataset_url, zip_path)
                with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                    zip_ref.extractall("/tmp/")
                print("✓ Dataset downloaded and extracted successfully")
            else:
                print("✓ Dataset already exists locally")

            return extract_path
        except Exception as e:
            print(f"Error downloading dataset: {str(e)}")
            return None

    def load_movielens_data(self, dataset_path):
        """Load MovieLens 10M rating data with metadata"""
        try:
            print(f"\n{'='*80}")
            print(f"LOADING MOVIELENS 10M DATA")
            print(f"{'='*80}")

            ratings_path = f"{dataset_path}/ratings.dat"
            movies_path = f"{dataset_path}/movies.dat"
            tags_path = f"{dataset_path}/tags.dat"

            # Load ratings
            print("Loading ratings data...")
            ratings_df = spark.read.csv(
                ratings_path,
                sep="::",
                header=False,
                inferSchema=True
            )

            ratings_df = ratings_df.select(
                col("_c0").alias("userId"),
                col("_c1").alias("movieId"),
                col("_c2").alias("rating"),
                col("_c3").alias("timestamp")
            ).coalesce(200)

            # Load movies
            print("Loading movies data...")
            movies_df = spark.read.csv(
                movies_path,
                sep="::",
                header=False,
                inferSchema=True
            )

            movies_df = movies_df.select(
                col("_c0").alias("movieId"),
                col("_c1").alias("movieTitle"),
                col("_c2").alias("genres")
            )

            # Load tags
            print("Loading tags data...")
            tags_df = spark.read.csv(
                tags_path,
                sep="::",
                header=False,
                inferSchema=True
            )


            tags_df = tags_df.select(
                col("_c0").alias("userId"),
                col("_c1").alias("movieId"),
                col("_c2").alias("tag"),
                col("_c3").alias("tag_timestamp")
            )

            # Tag count per movie
            tag_counts = tags_df.groupby("movieId").agg(
                count("tag").alias("tag_count"),
                countDistinct("userId").alias("distinct_taggers")
            )

            # Join all data
            df = ratings_df.join(movies_df, "movieId", "left") \
                           .join(tag_counts, "movieId", "left") \
                           .fillna(0)

            print(f"\n{'='*80}")
            print(f"DATASET STATISTICS - {self.dataset_name}")
            print(f"{'='*80}")
            print(f"Total ratings:        {ratings_df.count():>20,}")
            print(f"Unique users:         {ratings_df.select('userId').distinct().count():>20,}")
            print(f"Unique movies:        {ratings_df.select('movieId').distinct().count():>20,}")
            print(f"Rating range:         {ratings_df.agg(min('rating'), max('rating')).collect()[0]}")
            print(f"Total tags:           {tags_df.count():>20,}")
            print(f"Date range:           {ratings_df.agg(min('timestamp'), max('timestamp')).collect()[0]}")

            return ratings_df, df
        except Exception as e:
            print(f"Error loading data: {str(e)}")
            import traceback
            traceback.print_exc()
            return None, None

    def compute_statistical_features(self, df):
        """
        Compute statistical features:
        mean, median, mode, range, standard deviation
        """
        print(f"\n{'='*80}")
        print(f"COMPUTING STATISTICAL FEATURES")
        print(f"{'='*80}")

        # User-level statistics
        print("Computing user-level statistics...")
        user_stats, movie_stats, global_stats = self.compute_statistical_features(df)
        user_stats = df.groupby("userId").agg(
            mean("rating").alias("user_mean_rating"),
            approx_percentile("rating", 0.5).alias("user_median_rating"),
            stddev("rating").alias("user_stddev_rating"),
            (max("rating") - min("rating")).alias("user_range_rating"),
            count("rating").alias("user_rating_count"),
            min("rating").alias("user_min_rating"),
            max("rating").alias("user_max_rating"),
            percentile_approx("rating", array(lit(0.25), lit(0.75))).alias("user_quartiles")
        ).fillna(0)

        # Movie-level statistics
        print("Computing movie-level statistics...")
        movie_stats = df.groupby("movieId").agg(
            mean("rating").alias("movie_mean_rating"),
            approx_percentile("rating", 0.5).alias("movie_median_rating"),
            stddev("rating").alias("movie_stddev_rating"),
            (max("rating") - min("rating")).alias("movie_range_rating"),
            count("rating").alias("movie_rating_count"),
            min("rating").alias("movie_min_rating"),
            max("rating").alias("movie_max_rating"),
            percentile_approx("rating", array(lit(0.25), lit(0.75))).alias("movie_quartiles")
        ).fillna(0)

        # Global statistics
        global_stats = df.agg(
            mean("rating").alias("global_mean"),
            approx_percentile("rating", 0.5).alias("global_median"),
            stddev("rating").alias("global_stddev"),
            min("rating").alias("global_min"),
            max("rating").alias("global_max"),
            approx_percentile("rating", 0.25).alias("q1"),
            approx_percentile("rating", 0.75).alias("q3")
        ).collect()[0]

        print(f"✓ User statistical features: mean, median, stddev, range, quartiles")
        print(f"✓ Movie statistical features: mean, median, stddev, range, quartiles")
        print(f"\nGlobal Statistics:")
        print(f"  Mean rating:          {global_stats['global_mean']:.4f}")
        print(f"  Median rating:        {global_stats['global_median']:.4f}")
        print(f"  Std deviation:        {global_stats['global_stddev']:.4f}")
        print(f"  IQR:                  {global_stats['q3'] - global_stats['q1']:.4f}")
        print(f"  Rating range:         [{global_stats['global_min']}, {global_stats['global_max']}]")

        return user_stats, movie_stats, global_stats

    def compute_item_features(self, df, user_stats, movie_stats, global_stats):
        """
        Compute item-related features:
        splitting, binning, scaling, log transform, imputation
        """
        print(f"\n{'='*80}")
        print(f"COMPUTING ITEM-RELATED FEATURES")
        print(f"{'='*80}")

        # Join with statistics
        print("Joining features with statistics...")

        df_enriched = df.join(movie_stats, "movieId", "left") \
                 .join(user_stats, "userId", "left") \
                 .fillna(0)
        #df_enriched = self.compute_item_features(df, user_stats, movie_stats, global_stats)
        #df_enriched = df.join(movie_stats, "movieId", "left") \
                      # .join(user_stats, "userId", "left") \
                       #.fillna(0)

        # Log transformation of counts (avoid log(0))
        print("Applying log transformation to counts...")
        df_enriched = df_enriched.withColumn(
            "log_movie_count",
            when(col("movie_rating_count") > 1,
                 log(col("movie_rating_count"))).otherwise(0)
        ).withColumn(
            "log_user_count",
            when(col("user_rating_count") > 1,
                 log(col("user_rating_count"))).otherwise(0)
        ).withColumn(
            "log_tag_count",
            when(col("tag_count") > 0,
                 log(col("tag_count") + 1)).otherwise(0)
        )

        # Binning: Create bins for movie and user rating counts
        print("Applying binning to rating counts...")
        movie_count_bins = [0.0, 10.0, 50.0, 100.0, 500.0, 1000.0, float('inf')]
        user_count_bins = [0.0, 10.0, 50.0, 100.0, 500.0, 1000.0, float('inf')]

        bucketizer_movie = Bucketizer(
            splits=movie_count_bins,
            inputCol="movie_rating_count",
            outputCol="movie_count_bin"
        )

        bucketizer_user = Bucketizer(
            splits=user_count_bins,
            inputCol="user_rating_count",
            outputCol="user_count_bin"
        )

        df_enriched = bucketizer_movie.transform(df_enriched)
        df_enriched = bucketizer_user.transform(df_enriched)

        # Scaling: normalize rating-based features to [0, 1]
        print("Applying scaling to rating features...")
        global_min = global_stats['global_min']
        global_max = global_stats['global_max']
        rating_range = global_max - global_min if global_max != global_min else 1

        scaling_cols = [
            ("user_mean_rating", "user_mean_rating_scaled"),
            ("movie_mean_rating", "movie_mean_rating_scaled"),
            ("user_median_rating", "user_median_rating_scaled"),
            ("movie_median_rating", "movie_median_rating_scaled"),
            ("user_stddev_rating", "user_stddev_rating_scaled"),
            ("movie_stddev_rating", "movie_stddev_rating_scaled"),
            ("user_range_rating", "user_range_rating_scaled"),
            ("movie_range_rating", "movie_range_rating_scaled")
        ]

        for col_name, scaled_col_name in scaling_cols:
            df_enriched = df_enriched.withColumn(
                scaled_col_name,
                (col(col_name) - lit(global_min)) / lit(rating_range)
            ).fillna(0)

        # Splitting: Create time-based features from timestamp
        print("Creating time-based features from timestamp...")
        df_enriched = df_enriched.withColumn(
            "rating_year",
            year(from_unixtime(col("timestamp")))
        ).withColumn(
            "rating_month",
            month(from_unixtime(col("timestamp")))
        ).withColumn(
            "rating_day_of_week",
            dayofweek(from_unixtime(col("timestamp")))
        ).withColumn(
            "rating_hour",
            hour(from_unixtime(col("timestamp")))
        )

        # Imputation: Handle missing/null values
        print("Applying imputation for missing values...")
        imputation_cols = [
            "user_mean_rating", "user_median_rating", "user_stddev_rating",
            "user_range_rating", "movie_mean_rating", "movie_median_rating",
            "movie_stddev_rating", "movie_range_rating", "log_movie_count",
            "log_user_count", "log_tag_count", "tag_count", "distinct_taggers"
        ]

        for col_name in imputation_cols:
            if col_name in [c[0] for c in df_enriched.dtypes]:
                df_enriched = df_enriched.withColumn(
                    col_name,
                    when(col(col_name).isNull(), lit(0)).otherwise(col(col_name))
                )

        print(f"✓ Binning applied (6 bins for counts)")
        print(f"✓ Log transformation applied to counts")
        print(f"✓ Scaling applied to rating features: [{global_min}, {global_max}]")
        print(f"✓ Time-based splitting features created (year, month, day, hour)")
        print(f"✓ Imputation applied for missing values")

        return df_enriched

    def compute_user_features(self, df):
        """
        Compute user-related features:
        lower confidence level and upper confidence level
        """
        print(f"\n{'='*80}")
        print(f"COMPUTING USER-RELATED FEATURES (CONFIDENCE INTERVALS)")
        print(f"{'='*80}")

        # User statistics for confidence intervals
        user_stats = df.groupby("userId").agg(
            mean("rating").alias("user_mean"),
            stddev("rating").alias("user_stddev"),
            count("rating").alias("user_n"),
            variance("rating").alias("user_variance")
        ).fillna(0)

        # Calculate 95% confidence intervals (z-score = 1.96)
        z_score = 1.96

        print("Computing 95% confidence intervals...")
        user_stats = user_stats.withColumn(
            "std_error",
            col("user_stddev") / sqrt(col("user_n"))
        ).withColumn(
            "lower_confidence_level",
            col("user_mean") - (lit(z_score) * col("std_error"))
        ).withColumn(
            "upper_confidence_level",
            col("user_mean") + (lit(z_score) * col("std_error"))
        ).withColumn(
            "confidence_interval_width",
            col("upper_confidence_level") - col("lower_confidence_level")
        ).withColumn(
            "confidence_multiplier",
            when(col("confidence_interval_width") > 0,
                 lit(1.0) / col("confidence_interval_width")).otherwise(lit(1.0))
        ).withColumn(
            "confidence_level_diff",
            col("upper_confidence_level") - col("lower_confidence_level")
        ).fillna(0)

        print(f"✓ Lower confidence level computed (95% CI)")
        print(f"✓ Upper confidence level computed (95% CI)")
        print(f"✓ Confidence interval width and multiplier computed")
        print(f"✓ User variance computed")

        return user_stats

    def prepare_training_data(self, df_enriched, user_conf_features):
        """Prepare final training dataset with all features"""
        print(f"\n{'='*80}")
        print(f"PREPARING TRAINING DATASET")
        print(f"{'='*80}")

        # Join user confidence features
        df_final = df_enriched.join(
            user_conf_features.select(
                "userId",
                "user_n",
                "lower_confidence_level",
                "upper_confidence_level",
                "confidence_interval_width",
                "confidence_multiplier",
                "confidence_level_diff"
            ),
            "userId",
            "left"
        ).fillna(0)

        # Select all features for modeling
        feature_cols = [
            # Statistical features (8)
            "user_mean_rating", "user_median_rating",
            "user_stddev_rating", "user_range_rating",
            "movie_mean_rating", "movie_median_rating",
            "movie_stddev_rating", "movie_range_rating",
            # Item features - counts (4)
            "user_rating_count", "movie_rating_count",
            "log_movie_count", "log_user_count",
            # Item features - binning (2)
            "movie_count_bin", "user_count_bin",
            # Item features - scaling (8)
            "movie_mean_rating_scaled", "user_mean_rating_scaled",
            "movie_median_rating_scaled", "user_median_rating_scaled",
            "movie_stddev_rating_scaled", "user_stddev_rating_scaled",
            "movie_range_rating_scaled", "user_range_rating_scaled",
            # Item features - temporal (4)
            "rating_year", "rating_month", "rating_day_of_week", "rating_hour",
            # Item features - tag (3)
            "tag_count", "log_tag_count", "distinct_taggers",
            # User features - confidence intervals (5)
            "lower_confidence_level", "upper_confidence_level",
            "confidence_interval_width", "confidence_multiplier",
            "confidence_level_diff"
        ]

        # Create feature vector
        assembler = VectorAssembler(
            inputCols=feature_cols,
            outputCol="features",
            handleInvalid="skip"
        )

        df_final = assembler.transform(df_final)
        df_final = df_final.select(
            col("rating").alias("label"),
            "features"
        ).filter(col("label").isNotNull())

        df_final.cache()

        print(f"✓ Total features created: {len(feature_cols)}")
        print(f"✓ Feature categories breakdown:")
        print(f"  - Statistical features: 8")
        print(f"  - Item-related features: 21")
        print(f"  - User-related features: 5")
        print(f"✓ Training dataset ready: {df_final.count():,} records")

        return df_final, feature_cols

    def train_gbt_model(self, train_data, test_data):
        """Train Gradient Boosting Tree model"""
        print(f"\n{'='*80}")
        print(f"TRAINING GRADIENT BOOSTING TREE MODEL")
        print(f"{'='*80}")

        print("Model hyperparameters:")
        print(f"  Max iterations:       100")
        print(f"  Max depth:            8")
        print(f"  Learning rate:        0.1")
        print(f"  Subsampling rate:     0.7")
        print(f"  Min instances/node:   3")
        print(f"  Feature subset:       auto")

        gbt = GBTRegressor(
            maxIter=100,
            maxDepth=8,
            stepSize=0.1,
            seed=42,
            subsamplingRate=0.7,
            minInstancesPerNode=3,
            featureSubsetStrategy="auto",

            lossType="squared"
        )

        print("\nTraining model on 10M dataset...")
        model = gbt.fit(train_data)
        print("✓ Model training completed successfully")

        # Feature importance
        feature_importance = model.featureImportances
        self.feature_importance = feature_importance

        return model

    def evaluate_model(self, model, test_data):
        """Evaluate model and compute metrics"""
        print(f"\n{'='*80}")
        print(f"MODEL EVALUATION - {self.dataset_name}")
        print(f"{'='*80}")

        print("Generating predictions on test set...")
        predictions = model.transform(test_data)
        predictions.cache()

        # Compute metrics
        evaluator_mae = RegressionEvaluator(metricName="mae")
        evaluator_mse = RegressionEvaluator(metricName="mse")
        evaluator_rmse = RegressionEvaluator(metricName="rmse")
        evaluator_r2 = RegressionEvaluator(metricName="r2")
        evaluator_mape = RegressionEvaluator(metricName="mape")

        mae = evaluator_mae.evaluate(predictions)
        mse = evaluator_mse.evaluate(predictions)
        rmse = evaluator_rmse.evaluate(predictions)
        r2 = evaluator_r2.evaluate(predictions)
        mape = evaluator_mape.evaluate(predictions)

        # Prediction statistics
        pred_stats = predictions.agg(
            mean(abs(col("label") - col("prediction"))).alias("mean_abs_error"),
            mean(col("label")).alias("mean_label"),
            stddev(col("label")).alias("stddev_label"),
            min(col("prediction")).alias("min_pred"),
            max(col("prediction")).alias("max_pred"),
            count("*").alias("test_count")
        ).collect()[0]

        print(f"\nPerformance Metrics:")
        print(f"{'='*80}")
        print(f"Mean Absolute Error (MAE):     {mae:>20.6f}")
        print(f"Mean Squared Error (MSE):      {mse:>20.6f}")
        print(f"Root Mean Squared Error (RMSE):{rmse:>20.6f}")
        print(f"Mean Absolute Percentage Error:{mape:>20.6f}")
        print(f"R² Score:                      {r2:>20.6f}")
        print(f"{'='*80}")

        print(f"\nDataset Statistics:")
        print(f"Test set size:                 {pred_stats['test_count']:>20,}")
        print(f"Mean label value:              {pred_stats['mean_label']:>20.4f}")
        print(f"Label std deviation:           {pred_stats['stddev_label']:>20.4f}")
        print(f"Prediction range:              [{pred_stats['min_pred']:.2f}, {pred_stats['max_pred']:.2f}]")

        predictions.unpersist()

        return {
            "MAE": mae,
            "MSE": mse,
            "RMSE": rmse,
            "R2": r2,
            "MAPE": mape
        }
    def process_dataset(self):
      print(f"\n\n{'#'*80}")
      print(f"{'#'*80}")
      print(f"# HYBRID RECOMMENDATION SYSTEM - MOVIELENS 10M DATASET")
      print(f"{'#'*80}")
      print(f"{'#'*80}")

      try:
        # Download and load
        dataset_path = self.download_dataset()
        if dataset_path is None:
            return

        ratings_df, df = self.load_movielens_data(dataset_path)
        if df is None:
            return

        # ----------------------------------------
        # FIX: Compute statistical features FIRST
        # ----------------------------------------
        user_stats, movie_stats, global_stats = \
            self.compute_statistical_features(ratings_df)

        # ----------------------------------------
        # Now it's safe to compute item features
        # ----------------------------------------
        df_enriched = self.compute_item_features(
            df, user_stats, movie_stats, global_stats
        )

        # Split data (80/20)
        print(f"\n{'='*80}")
        print(f"SPLITTING DATA INTO TRAIN/TEST (80/20)")
        print(f"{'='*80}")

        train_enriched, test_enriched = df_enriched.randomSplit([0.8, 0.2], seed=42)

        print(f"✓ Training set: {train_enriched.count():,} records")
        print(f"✓ Test set:     {test_enriched.count():,} records")

        # Compute user confidence features
        user_conf = self.compute_user_features(ratings_df)

        # Prepare data
        train_data, feature_cols = self.prepare_training_data(
            train_enriched, user_conf
        )
        test_data, _ = self.prepare_training_data(
            test_enriched, user_conf
        )

        # Train and evaluate
        model = self.train_gbt_model(train_data, test_data)
        metrics = self.evaluate_model(model, test_data)

        self.results[self.dataset_name] = metrics
        return metrics

      except Exception as e:
        print(f"\nError processing dataset: {str(e)}")
        import traceback
        traceback.print_exc()
        return None


    def print_summary(self):
        """Print final summary"""
        print(f"\n\n{'='*80}")
        print(f"FINAL RESULTS SUMMARY - MOVIELENS 10M")
        print(f"{'='*80}\n")

        for dataset_name, metrics in self.results.items():
            print(f"Dataset: {dataset_name}")
            print(f"{'='*80}")
            print(f"Mean Absolute Error (MAE):        {metrics['MAE']:.6f}")
            print(f"Mean Squared Error (MSE):         {metrics['MSE']:.6f}")
            print(f"Root Mean Squared Error (RMSE):   {metrics['RMSE']:.6f}")
            print(f"Mean Absolute Percentage Error:   {metrics['MAPE']:.6f}")
            print(f"R² Score:                         {metrics['R2']:.6f}")
            print(f"{'='*80}\n")


# Main execution
if __name__ == "__main__":
    system = HybridRecommendationSystem()
    system.process_dataset()
    system.print_summary()
    spark.stop()



################################################################################
################################################################################
# HYBRID RECOMMENDATION SYSTEM - MOVIELENS 10M DATASET
################################################################################
################################################################################

DOWNLOADING MOVIELENS 10M DATASET
✓ Dataset already exists locally

LOADING MOVIELENS 10M DATA
Loading ratings data...
Loading movies data...
Loading tags data...

DATASET STATISTICS - MovieLens 10M
Total ratings:                  10,000,054
Unique users:                       69,878
Unique movies:                      10,677
Rating range:         Row(min(rating)=0.5, max(rating)=5.0)
Total tags:                         95,580
Date range:           Row(min(timestamp)=789652009, max(timestamp)=1231131736)

COMPUTING STATISTICAL FEATURES
Computing user-level statistics...

COMPUTING STATISTICAL FEATURES
Computing

Traceback (most recent call last):
  File "/tmp/ipython-input-1529347985.py", line 561, in process_dataset
    self.compute_statistical_features(ratings_df)
  File "/tmp/ipython-input-1529347985.py", line 160, in compute_statistical_features
    user_stats, movie_stats, global_stats = self.compute_statistical_features(df)
                                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipython-input-1529347985.py", line 160, in compute_statistical_features
    user_stats, movie_stats, global_stats = self.compute_statistical_features(df)
                                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipython-input-1529347985.py", line 160, in compute_statistical_features
    user_stats, movie_stats, global_stats = self.compute_statistical_features(df)
                                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  [Previous line repeated 972 more times]
  File "/tmp/ipython-input-1529347985.py", line 154, i

In [None]:
#!/usr/bin/env python3
"""
Hybrid Recommendation System - MovieLens 10M
Cleaned, robust and ready-to-run script.

Notes:
- Ensure Java (JDK 11+) and PySpark are installed in the environment.
- The script downloads and extracts the MovieLens 10M dataset to /tmp/ml-10M100K if not present.
- The script is defensive: it handles missing tags/movies files and uses safe defaults.
- GBTRegressor is instantiated without unsupported parameters to avoid version incompatibilities.
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, Bucketizer
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import os
import warnings
warnings.filterwarnings('ignore')

# ----------------------
# Spark session (tweak resources to your cluster)
# ----------------------
spark = SparkSession.builder \
    .appName("HybridRecommendationSystemMovieLens10M") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "200") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# ----------------------
# Main class
# ----------------------
class HybridRecommendationSystem:
    def __init__(self):
        self.results = {}
        self.dataset_name = "MovieLens 10M"
        self.feature_importance = None

    # ---------- dataset download / load ----------
    def download_dataset(self):
        """Download and extract MovieLens 10M to /tmp/ml-10M100K"""
        try:
            import urllib.request
            import zipfile

            dataset_url = "https://files.grouplens.org/datasets/movielens/ml-10m.zip"
            zip_path = "/tmp/ml-10m.zip"
            extract_path = "/tmp/ml-10M100K"

            if not os.path.exists(extract_path):
                os.makedirs(extract_path, exist_ok=True)
                print("Downloading MovieLens 10M dataset...")
                urllib.request.urlretrieve(dataset_url, zip_path)
                with zipfile.ZipFile(zip_path, 'r') as z:
                    z.extractall("/tmp/")
                print("Dataset downloaded and extracted to /tmp/ml-10M100K")
            else:
                print("Dataset already present at /tmp/ml-10M100K")

            return extract_path
        except Exception as e:
            print(f"Error downloading dataset: {e}")
            return None

    def load_movielens_data(self, dataset_path):
        """Load ratings, movies and tags robustly. Returns ratings_df, unified_df"""
        try:
            ratings_path = os.path.join(dataset_path, "ratings.dat")
            movies_path = os.path.join(dataset_path, "movies.dat")
            tags_path = os.path.join(dataset_path, "tags.dat")

            # Helper: safe read CSV with '::' splitter
            def safe_read(path, expected_cols):
                if not os.path.exists(path):
                    return None
                # Spark's CSV reader will create _c0.. if no header
                return spark.read.option("sep", "::").option("inferSchema", True).option("header", False).csv(path)

            print("Loading ratings data...")
            ratings_raw = safe_read(ratings_path, 4)
            if ratings_raw is None:
                print(f"Ratings file not found at {ratings_path}")
                return None, None

            # Map to consistent column names (handle _cN or _1 style)
            cols = ratings_raw.columns
            # expect 4 columns: userId, movieId, rating, timestamp
            ratings_df = ratings_raw.select(
                col(cols[0]).cast('int').alias('userId'),
                col(cols[1]).cast('int').alias('movieId'),
                col(cols[2]).cast('double').alias('rating'),
                col(cols[3]).cast('long').alias('timestamp')
            ).coalesce(200)

            print("Loading movies data (if available)...")
            movies_raw = safe_read(movies_path, 3)
            if movies_raw is None:
                # create minimal movies_df with movieId only
                movies_df = ratings_df.select('movieId').distinct().withColumn('movieTitle', lit(None).cast('string')).withColumn('genres', lit(None).cast('string'))
            else:
                mcols = movies_raw.columns
                movies_df = movies_raw.select(
                    col(mcols[0]).cast('int').alias('movieId'),
                    col(mcols[1]).alias('movieTitle'),
                    col(mcols[2]).alias('genres')
                )

            print("Loading tags data (if available)...")
            tags_raw = safe_read(tags_path, 4)
            if tags_raw is None:
                # empty tags
                tags_df = spark.createDataFrame([], schema=['userId', 'movieId', 'tag', 'tag_timestamp'])
            else:
                tcols = tags_raw.columns
                tags_df = tags_raw.select(
                    col(tcols[0]).cast('int').alias('userId'),
                    col(tcols[1]).cast('int').alias('movieId'),
                    col(tcols[2]).alias('tag'),
                    col(tcols[3]).cast('long').alias('tag_timestamp')
                )

            # Compute tag counts per movie (if tags exist)
            if len(tags_df.columns) > 0:
                tag_counts = tags_df.groupBy('movieId').agg(
                    count('tag').alias('tag_count'),
                    countDistinct('userId').alias('distinct_taggers')
                )
            else:
                tag_counts = spark.createDataFrame([], schema=['movieId', 'tag_count', 'distinct_taggers'])

            # Left join: ensure every rating row has movie metadata + tag counts (fill missing with 0)
            unified = ratings_df.join(movies_df, 'movieId', how='left')\
                                .join(tag_counts, 'movieId', how='left')\
                                .fillna({'tag_count': 0, 'distinct_taggers': 0})

            print(f"Total ratings: {ratings_df.count():,}")
            print(f"Unique users: {ratings_df.select('userId').distinct().count():,}")
            print(f"Unique movies: {ratings_df.select('movieId').distinct().count():,}")

            return ratings_df, unified
        except Exception as e:
            print(f"Error loading data: {e}")
            return None, None

    # ---------- statistics & features ----------
    def compute_statistical_features(self, df):
        """Compute user/movie/global statistics and return (user_stats, movie_stats, global_stats)"""
        print("Computing statistical features...")

        user_stats = df.groupBy('userId').agg(
            mean('rating').alias('user_mean_rating'),
            approx_percentile('rating', 0.5).alias('user_median_rating'),
            stddev('rating').alias('user_stddev_rating'),
            (max('rating') - min('rating')).alias('user_range_rating'),
            count('rating').alias('user_rating_count'),
            min('rating').alias('user_min_rating'),
            max('rating').alias('user_max_rating')
        ).fillna(0)

        movie_stats = df.groupBy('movieId').agg(
            mean('rating').alias('movie_mean_rating'),
            approx_percentile('rating', 0.5).alias('movie_median_rating'),
            stddev('rating').alias('movie_stddev_rating'),
            (max('rating') - min('rating')).alias('movie_range_rating'),
            count('rating').alias('movie_rating_count'),
            min('rating').alias('movie_min_rating'),
            max('rating').alias('movie_max_rating')
        ).fillna(0)

        global_row = df.agg(
            mean('rating').alias('global_mean'),
            approx_percentile('rating', 0.5).alias('global_median'),
            stddev('rating').alias('global_stddev'),
            min('rating').alias('global_min'),
            max('rating').alias('global_max')
        ).collect()[0]

        global_stats = {
            'global_mean': global_row['global_mean'],
            'global_median': global_row['global_median'],
            'global_stddev': global_row['global_stddev'],
            'global_min': global_row['global_min'],
            'global_max': global_row['global_max']
        }

        return user_stats, movie_stats, global_stats

    def compute_item_features(self, df, user_stats, movie_stats, global_stats):
        """Join stats and create engineered features"""
        print("Computing item-related features...")

        df_enriched = df.join(movie_stats, 'movieId', how='left')\
                        .join(user_stats, 'userId', how='left')\
                        .fillna(0)

        # log counts
        df_enriched = df_enriched.withColumn('log_movie_count', when(col('movie_rating_count') > 1, log(col('movie_rating_count'))).otherwise(lit(0.0)))\
                                 .withColumn('log_user_count', when(col('user_rating_count') > 1, log(col('user_rating_count'))).otherwise(lit(0.0)))\
                                 .withColumn('log_tag_count', when(col('tag_count') > 0, log(col('tag_count') + 1)).otherwise(lit(0.0)))

        # binning
        movie_bins = [0.0, 10.0, 50.0, 100.0, 500.0, 1000.0, float('inf')]
        user_bins = movie_bins

        bucket_movie = Bucketizer(splits=movie_bins, inputCol='movie_rating_count', outputCol='movie_count_bin')
        bucket_user = Bucketizer(splits=user_bins, inputCol='user_rating_count', outputCol='user_count_bin')

        df_enriched = bucket_movie.transform(df_enriched)
        df_enriched = bucket_user.transform(df_enriched)

        # scaling (min-max using global min/max)
        gmin = global_stats['global_min'] if global_stats['global_min'] is not None else 0.0
        gmax = global_stats['global_max'] if global_stats['global_max'] is not None else 5.0
        rng = gmax - gmin if gmax != gmin else 1.0

        scale_cols = ['user_mean_rating', 'movie_mean_rating', 'user_stddev_rating', 'movie_stddev_rating', 'user_range_rating', 'movie_range_rating']
        for c in scale_cols:
            scaled = f"{c}_scaled"
            df_enriched = df_enriched.withColumn(scaled, (col(c) - lit(gmin)) / lit(rng)).fillna(0)

        # time features
        df_enriched = df_enriched.withColumn('rating_year', year(from_unixtime(col('timestamp'))))\
                                   .withColumn('rating_month', month(from_unixtime(col('timestamp'))))\
                                   .withColumn('rating_day_of_week', dayofweek(from_unixtime(col('timestamp'))))\
                                   .withColumn('rating_hour', hour(from_unixtime(col('timestamp'))))

        # impute missing
        imputation_cols = ['tag_count', 'distinct_taggers', 'log_tag_count']
        for c in imputation_cols:
            if c in [x[0] for x in df_enriched.dtypes]:
                df_enriched = df_enriched.withColumn(c, when(col(c).isNull(), lit(0)).otherwise(col(c)))

        return df_enriched

    def compute_user_features(self, df):
        print("Computing user confidence features...")
        user_stats = df.groupBy('userId').agg(
            mean('rating').alias('user_mean'),
            stddev('rating').alias('user_stddev'),
            count('rating').alias('user_n')
        ).fillna(0)

        z = 1.96
        user_stats = user_stats.withColumn('std_error', when(col('user_n') > 0, col('user_stddev') / sqrt(col('user_n'))).otherwise(lit(0.0)))\
                               .withColumn('lower_confidence_level', col('user_mean') - lit(z) * col('std_error'))\
                               .withColumn('upper_confidence_level', col('user_mean') + lit(z) * col('std_error'))\
                               .withColumn('confidence_interval_width', col('upper_confidence_level') - col('lower_confidence_level'))\
                               .withColumn('confidence_multiplier', when(col('confidence_interval_width') > 0, lit(1.0) / col('confidence_interval_width')).otherwise(lit(1.0)))\
                               .fillna(0)

        return user_stats

    def prepare_training_data(self, df_enriched, user_conf_features):
        print("Preparing training dataset...")

        df_final = df_enriched.join(user_conf_features.select('userId', 'lower_confidence_level', 'upper_confidence_level', 'confidence_interval_width', 'confidence_multiplier'), 'userId', how='left').fillna(0)

        feature_cols = [
            'user_mean_rating','user_median_rating','user_stddev_rating','user_range_rating',
            'movie_mean_rating','movie_median_rating','movie_stddev_rating','movie_range_rating',
            'user_rating_count','movie_rating_count','log_movie_count','log_user_count',
            'movie_count_bin','user_count_bin',
            'movie_mean_rating_scaled','user_mean_rating_scaled','user_stddev_rating_scaled','movie_stddev_rating_scaled',
            'rating_year','rating_month','rating_day_of_week','rating_hour',
            'tag_count','log_tag_count','distinct_taggers',
            'lower_confidence_level','upper_confidence_level','confidence_interval_width','confidence_multiplier'
        ]

        # keep only columns that exist to avoid assembly errors
        input_cols = [c for c in feature_cols if c in [x[0] for x in df_final.dtypes]]

        assembler = VectorAssembler(inputCols=input_cols, outputCol='features', handleInvalid='skip')
        df_final = assembler.transform(df_final).select(col('rating').alias('label'), 'features').filter(col('label').isNotNull())

        df_final = df_final.repartition(200).cache()
        return df_final, input_cols

    def train_gbt_model(self, train_data, test_data):
        print('Training GBTRegressor...')
        gbt = GBTRegressor(maxIter=50, maxDepth=6, stepSize=0.1, seed=42, subsamplingRate=0.8, minInstancesPerNode=5)
        model = gbt.fit(train_data)
        self.feature_importance = model.featureImportances
        return model

    def evaluate_model(self, model, test_data):
        print('Evaluating model...')
        predictions = model.transform(test_data).cache()
        evaluator_rmse = RegressionEvaluator(metricName='rmse')
        evaluator_mae = RegressionEvaluator(metricName='mae')
        rmse = evaluator_rmse.evaluate(predictions)
        mae = evaluator_mae.evaluate(predictions)
        predictions.unpersist()
        return {'RMSE': rmse, 'MAE': mae}

    def process_dataset(self):
        print('Starting pipeline...')
        dataset_path = self.download_dataset()
        if dataset_path is None:
            return

        ratings_df, unified_df = self.load_movielens_data(dataset_path)
        if unified_df is None:
            return

        # compute stats
        user_stats, movie_stats, global_stats = self.compute_statistical_features(unified_df)

        # item features
        df_enriched = self.compute_item_features(unified_df, user_stats, movie_stats, global_stats)

        # split enriched data
        train_enriched, test_enriched = df_enriched.randomSplit([0.8, 0.2], seed=42)

        user_conf = self.compute_user_features(ratings_df)

        train_data, feature_cols = self.prepare_training_data(train_enriched, user_conf)
        test_data, _ = self.prepare_training_data(test_enriched, user_conf)

        model = self.train_gbt_model(train_data, test_data)
        metrics = self.evaluate_model(model, test_data)

        self.results[self.dataset_name] = metrics
        print('Pipeline finished')
        return metrics

    def print_summary(self):
        print('\nFINAL RESULTS SUMMARY')
        for k, v in self.results.items():
            print(f"Dataset: {k} -> {v}")


if __name__ == '__main__':
    system = HybridRecommendationSystem()
    system.process_dataset()
    system.print_summary()
    spark.stop()


Starting pipeline...
Dataset already present at /tmp/ml-10M100K
Loading ratings data...
Loading movies data (if available)...
Loading tags data (if available)...
Total ratings: 10,000,054
Unique users: 69,878
Unique movies: 10,677
Computing statistical features...
Computing item-related features...
Computing user confidence features...
Preparing training dataset...
Preparing training dataset...
Training GBTRegressor...


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Py4JError: An error occurred while calling o977.fit

In [None]:
#!/usr/bin/env python3
"""
Hybrid Recommendation System - MovieLens 10M
Cleaned, robust and ready-to-run script.

Notes:
- Ensure Java (JDK 11+) and PySpark are installed in the environment.
- The script downloads and extracts the MovieLens 10M dataset to /tmp/ml-10M100K if not present.
- The script is defensive: it handles missing tags/movies files and uses safe defaults.
- GBTRegressor is instantiated without unsupported parameters to avoid version incompatibilities.
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, Bucketizer
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import os
import warnings
warnings.filterwarnings('ignore')

# ----------------------
# Spark session (tweak resources to your cluster)
# ----------------------
spark = SparkSession.builder \
    .appName("HybridRecommendationSystemMovieLens10M") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "200") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# ----------------------
# Main class
# ----------------------
class HybridRecommendationSystem:
    def __init__(self):
        self.results = {}
        self.dataset_name = "MovieLens 10M"
        self.feature_importance = None

    # ---------- dataset download / load ----------
    def download_dataset(self):
        """Download and extract MovieLens 10M to /tmp/ml-10M100K"""
        try:
            import urllib.request
            import zipfile

            dataset_url = "https://files.grouplens.org/datasets/movielens/ml-10m.zip"
            zip_path = "/tmp/ml-10m.zip"
            extract_path = "/tmp/ml-10M100K"

            if not os.path.exists(extract_path):
                os.makedirs(extract_path, exist_ok=True)
                print("Downloading MovieLens 10M dataset...")
                urllib.request.urlretrieve(dataset_url, zip_path)
                with zipfile.ZipFile(zip_path, 'r') as z:
                    z.extractall("/tmp/")
                print("Dataset downloaded and extracted to /tmp/ml-10M100K")
            else:
                print("Dataset already present at /tmp/ml-10M100K")

            return extract_path
        except Exception as e:
            print(f"Error downloading dataset: {e}")
            return None

    def load_movielens_data(self, dataset_path):
        """Load ratings, movies and tags robustly. Returns ratings_df, unified_df"""
        try:
            ratings_path = os.path.join(dataset_path, "ratings.dat")
            movies_path = os.path.join(dataset_path, "movies.dat")
            tags_path = os.path.join(dataset_path, "tags.dat")

            # Helper: safe read CSV with '::' splitter
            def safe_read(path, expected_cols):
                if not os.path.exists(path):
                    return None
                # Spark's CSV reader will create _c0.. if no header
                return spark.read.option("sep", "::").option("inferSchema", True).option("header", False).csv(path)

            print("Loading ratings data...")
            ratings_raw = safe_read(ratings_path, 4)
            if ratings_raw is None:
                print(f"Ratings file not found at {ratings_path}")
                return None, None

            # Map to consistent column names (handle _cN or _1 style)
            cols = ratings_raw.columns
            # expect 4 columns: userId, movieId, rating, timestamp
            ratings_df = ratings_raw.select(
                col(cols[0]).cast('int').alias('userId'),
                col(cols[1]).cast('int').alias('movieId'),
                col(cols[2]).cast('double').alias('rating'),
                col(cols[3]).cast('long').alias('timestamp')
            ).coalesce(200)

            print("Loading movies data (if available)...")
            movies_raw = safe_read(movies_path, 3)
            if movies_raw is None:
                # create minimal movies_df with movieId only
                movies_df = ratings_df.select('movieId').distinct().withColumn('movieTitle', lit(None).cast('string')).withColumn('genres', lit(None).cast('string'))
            else:
                mcols = movies_raw.columns
                movies_df = movies_raw.select(
                    col(mcols[0]).cast('int').alias('movieId'),
                    col(mcols[1]).alias('movieTitle'),
                    col(mcols[2]).alias('genres')
                )

            print("Loading tags data (if available)...")
            tags_raw = safe_read(tags_path, 4)
            if tags_raw is None:
                # empty tags
                tags_df = spark.createDataFrame([], schema=['userId', 'movieId', 'tag', 'tag_timestamp'])
            else:
                tcols = tags_raw.columns
                tags_df = tags_raw.select(
                    col(tcols[0]).cast('int').alias('userId'),
                    col(tcols[1]).cast('int').alias('movieId'),
                    col(tcols[2]).alias('tag'),
                    col(tcols[3]).cast('long').alias('tag_timestamp')
                )

            # Compute tag counts per movie (if tags exist)
            if len(tags_df.columns) > 0:
                tag_counts = tags_df.groupBy('movieId').agg(
                    count('tag').alias('tag_count'),
                    countDistinct('userId').alias('distinct_taggers')
                )
            else:
                tag_counts = spark.createDataFrame([], schema=['movieId', 'tag_count', 'distinct_taggers'])

            # Left join: ensure every rating row has movie metadata + tag counts (fill missing with 0)
            unified = ratings_df.join(movies_df, 'movieId', how='left')\
                                .join(tag_counts, 'movieId', how='left')\
                                .fillna({'tag_count': 0, 'distinct_taggers': 0})

            print(f"Total ratings: {ratings_df.count():,}")
            print(f"Unique users: {ratings_df.select('userId').distinct().count():,}")
            print(f"Unique movies: {ratings_df.select('movieId').distinct().count():,}")

            return ratings_df, unified
        except Exception as e:
            print(f"Error loading data: {e}")
            return None, None

    # ---------- statistics & features ----------
    def compute_statistical_features(self, df):
        """Compute user/movie/global statistics and return (user_stats, movie_stats, global_stats)"""
        print("Computing statistical features...")

        user_stats = df.groupBy('userId').agg(
            mean('rating').alias('user_mean_rating'),
            approx_percentile('rating', 0.5).alias('user_median_rating'),
            stddev('rating').alias('user_stddev_rating'),
            (max('rating') - min('rating')).alias('user_range_rating'),
            count('rating').alias('user_rating_count'),
            min('rating').alias('user_min_rating'),
            max('rating').alias('user_max_rating')
        ).fillna(0)

        movie_stats = df.groupBy('movieId').agg(
            mean('rating').alias('movie_mean_rating'),
            approx_percentile('rating', 0.5).alias('movie_median_rating'),
            stddev('rating').alias('movie_stddev_rating'),
            (max('rating') - min('rating')).alias('movie_range_rating'),
            count('rating').alias('movie_rating_count'),
            min('rating').alias('movie_min_rating'),
            max('rating').alias('movie_max_rating')
        ).fillna(0)

        global_row = df.agg(
            mean('rating').alias('global_mean'),
            approx_percentile('rating', 0.5).alias('global_median'),
            stddev('rating').alias('global_stddev'),
            min('rating').alias('global_min'),
            max('rating').alias('global_max')
        ).collect()[0]

        global_stats = {
            'global_mean': global_row['global_mean'],
            'global_median': global_row['global_median'],
            'global_stddev': global_row['global_stddev'],
            'global_min': global_row['global_min'],
            'global_max': global_row['global_max']
        }

        return user_stats, movie_stats, global_stats

    def compute_item_features(self, df, user_stats, movie_stats, global_stats):
        """Join stats and create engineered features"""
        print("Computing item-related features...")

        df_enriched = df.join(movie_stats, 'movieId', how='left')\
                        .join(user_stats, 'userId', how='left')\
                        .fillna(0)

        # log counts
        df_enriched = df_enriched.withColumn('log_movie_count', when(col('movie_rating_count') > 1, log(col('movie_rating_count'))).otherwise(lit(0.0)))\
                                 .withColumn('log_user_count', when(col('user_rating_count') > 1, log(col('user_rating_count'))).otherwise(lit(0.0)))\
                                 .withColumn('log_tag_count', when(col('tag_count') > 0, log(col('tag_count') + 1)).otherwise(lit(0.0)))

        # binning
        movie_bins = [0.0, 10.0, 50.0, 100.0, 500.0, 1000.0, float('inf')]
        user_bins = movie_bins

        bucket_movie = Bucketizer(splits=movie_bins, inputCol='movie_rating_count', outputCol='movie_count_bin')
        bucket_user = Bucketizer(splits=user_bins, inputCol='user_rating_count', outputCol='user_count_bin')

        df_enriched = bucket_movie.transform(df_enriched)
        df_enriched = bucket_user.transform(df_enriched)

        # scaling (min-max using global min/max)
        gmin = global_stats['global_min'] if global_stats['global_min'] is not None else 0.0
        gmax = global_stats['global_max'] if global_stats['global_max'] is not None else 5.0
        rng = gmax - gmin if gmax != gmin else 1.0

        scale_cols = ['user_mean_rating', 'movie_mean_rating', 'user_stddev_rating', 'movie_stddev_rating', 'user_range_rating', 'movie_range_rating']
        for c in scale_cols:
            scaled = f"{c}_scaled"
            df_enriched = df_enriched.withColumn(scaled, (col(c) - lit(gmin)) / lit(rng)).fillna(0)

        # time features
        df_enriched = df_enriched.withColumn('rating_year', year(from_unixtime(col('timestamp'))))\
                                   .withColumn('rating_month', month(from_unixtime(col('timestamp'))))\
                                   .withColumn('rating_day_of_week', dayofweek(from_unixtime(col('timestamp'))))\
                                   .withColumn('rating_hour', hour(from_unixtime(col('timestamp'))))

        # impute missing
        imputation_cols = ['tag_count', 'distinct_taggers', 'log_tag_count']
        for c in imputation_cols:
            if c in [x[0] for x in df_enriched.dtypes]:
                df_enriched = df_enriched.withColumn(c, when(col(c).isNull(), lit(0)).otherwise(col(c)))

        return df_enriched

    def compute_user_features(self, df):
        print("Computing user confidence features...")
        user_stats = df.groupBy('userId').agg(
            mean('rating').alias('user_mean'),
            stddev('rating').alias('user_stddev'),
            count('rating').alias('user_n')
        ).fillna(0)

        z = 1.96
        user_stats = user_stats.withColumn('std_error', when(col('user_n') > 0, col('user_stddev') / sqrt(col('user_n'))).otherwise(lit(0.0)))\
                               .withColumn('lower_confidence_level', col('user_mean') - lit(z) * col('std_error'))\
                               .withColumn('upper_confidence_level', col('user_mean') + lit(z) * col('std_error'))\
                               .withColumn('confidence_interval_width', col('upper_confidence_level') - col('lower_confidence_level'))\
                               .withColumn('confidence_multiplier', when(col('confidence_interval_width') > 0, lit(1.0) / col('confidence_interval_width')).otherwise(lit(1.0)))\
                               .fillna(0)

        return user_stats

    def prepare_training_data(self, df_enriched, user_conf_features):
        print("Preparing training dataset...")

        df_final = df_enriched.join(user_conf_features.select('userId', 'lower_confidence_level', 'upper_confidence_level', 'confidence_interval_width', 'confidence_multiplier'), 'userId', how='left').fillna(0)

        feature_cols = [
            'user_mean_rating','user_median_rating','user_stddev_rating','user_range_rating',
            'movie_mean_rating','movie_median_rating','movie_stddev_rating','movie_range_rating',
            'user_rating_count','movie_rating_count','log_movie_count','log_user_count',
            'movie_count_bin','user_count_bin',
            'movie_mean_rating_scaled','user_mean_rating_scaled','user_stddev_rating_scaled','movie_stddev_rating_scaled',
            'rating_year','rating_month','rating_day_of_week','rating_hour',
            'tag_count','log_tag_count','distinct_taggers',
            'lower_confidence_level','upper_confidence_level','confidence_interval_width','confidence_multiplier'
        ]

        # keep only columns that exist to avoid assembly errors
        input_cols = [c for c in feature_cols if c in [x[0] for x in df_final.dtypes]]

        assembler = VectorAssembler(inputCols=input_cols, outputCol='features', handleInvalid='skip')
        df_final = assembler.transform(df_final).select(col('rating').alias('label'), 'features').filter(col('label').isNotNull())

        df_final = df_final.repartition(200).cache()
        return df_final, input_cols

    def train_gbt_model(self, train_data, test_data):
        print('Training GBTRegressor...')
        gbt = GBTRegressor(maxIter=50, maxDepth=6, stepSize=0.1, seed=42, subsamplingRate=0.8, minInstancesPerNode=5)
        model = gbt.fit(train_data)
        self.feature_importance = model.featureImportances
        return model

    def evaluate_model(self, model, test_data):
        print('Evaluating model...')
        predictions = model.transform(test_data).cache()
        evaluator_rmse = RegressionEvaluator(metricName='rmse')
        evaluator_mae = RegressionEvaluator(metricName='mae')
        rmse = evaluator_rmse.evaluate(predictions)
        mae = evaluator_mae.evaluate(predictions)
        predictions.unpersist()
        return {'RMSE': rmse, 'MAE': mae}

    def process_dataset(self):
        print('Starting pipeline...')
        dataset_path = self.download_dataset()
        if dataset_path is None:
            return

        ratings_df, unified_df = self.load_movielens_data(dataset_path)
        if unified_df is None:
            return

        # compute stats
        user_stats, movie_stats, global_stats = self.compute_statistical_features(unified_df)

        # item features
        df_enriched = self.compute_item_features(unified_df, user_stats, movie_stats, global_stats)

        # split enriched data
        train_enriched, test_enriched = df_enriched.randomSplit([0.8, 0.2], seed=42)

        user_conf = self.compute_user_features(ratings_df)

        train_data, feature_cols = self.prepare_training_data(train_enriched, user_conf)
        test_data, _ = self.prepare_training_data(test_enriched, user_conf)

        model = self.train_gbt_model(train_data, test_data)
        metrics = self.evaluate_model(model, test_data)

        self.results[self.dataset_name] = metrics
        print('Pipeline finished')
        return metrics

    def print_summary(self):
        print('\nFINAL RESULTS SUMMARY')
        for k, v in self.results.items():
            print(f"Dataset: {k} -> {v}")


if __name__ == '__main__':
    system = HybridRecommendationSystem()
    system.process_dataset()
    system.print_summary()
    spark.stop()


ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
!ls /tmp/ml-10M100K


allbut.pl  movies.dat  ratings.dat  README.html  split_ratings.sh  tags.dat
