In [None]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
!tar xzf spark-3.3.3-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[*]")

sc = SparkContext.getOrCreate(spark_conf)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
pip install scikit-surprise



In [None]:
import pandas as pd
from surprise import Reader, Dataset
from surprise.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import make_pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from surprise import SVD
from surprise import accuracy
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
import pandas as pd
import numpy as np
from surprise import Reader, Dataset, SVD
from surprise.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split as sk_train_test_split
import random
from collections import defaultdict
from surprise.model_selection import cross_validate, GridSearchCV
from surprise.model_selection import train_test_split
from surprise import Dataset, Reader, SVD, SVDpp
from surprise.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split as sk_train_test_split
from sklearn.model_selection import GridSearchCV as sk_GridSearchCV
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import Ridge
from sklearn.model_selection import train_test_split as sk_train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity

from pyspark.sql import SparkSession, Row, Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, IndexToString, StringIndexerModel, Word2Vec, Tokenizer, CountVectorizer, MinHashLSH
from pyspark.sql.functions import col, monotonically_increasing_id, udf, max, min, avg, mean, count, explode, row_number, collect_list, desc, lit, broadcast
from pyspark.sql.types import FloatType, StructType, StructField, StringType
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import pandas_udf, PandasUDFType

## Data Preprocessing

In [None]:
# Create a Spark session with a meaningful app name
spark = SparkSession.builder.appName("RecommendationSystemApp").getOrCreate()

# Load the CSV file into a PySpark DataFrame
data_path = '/content/drive/MyDrive/Data_Preprocessing/final_training_dataset.csv'
df = spark.read.option("header", "true") \
               .option("inferSchema", "true") \
               .option("quote", "\"") \
               .option("escape", "\"") \
               .csv(data_path)

# remove unnessary columns
columns_to_drop = ['review_text', 'sentiment', 'detect']
df = df.drop(*columns_to_drop)

df = df.dropna()

# Split the data into training and test sets
train_df_spark, test_df_spark = df.randomSplit([0.8, 0.2], seed=42)

# Convert user_id and business_id to numerical indices
indexers = [StringIndexer(inputCol=column, outputCol=column+"_indexed").fit(df) for column in (["user_id", "business_id"])]
pipeline = Pipeline(stages=indexers)
indexer_model = pipeline.fit(train_df_spark)
train_df_spark = indexer_model.transform(train_df_spark)
test_df_spark = indexer_model.transform(test_df_spark)

In [None]:
train_df_spark.show(3)

+--------------------+--------------------+------+--------------------+------------------+---------------+-------------------+
|         business_id|             user_id|rating|     cleaned_reviews|   sentiment_score|user_id_indexed|business_id_indexed|
+--------------------+--------------------+------+--------------------+------------------+---------------+-------------------+
|604092d865e4ba058...|10301301358122957...|   3.0|       shrimp scampi|0.9999940395355225|         4894.0|             3638.0|
|604092d865e4ba058...|11326994618515516...|   5.0|great time ate lo...|0.9999945163726807|        18985.0|             3638.0|
|604094167cd8bf130...|10014988986459009...|   5.0|got ta fight chee...|0.9999940395355225|         8588.0|              694.0|
+--------------------+--------------------+------+--------------------+------------------+---------------+-------------------+
only showing top 3 rows



In [None]:
# first write it to a parquet format, then read it as a pandas dataframe; faster than direct convert
train_df_spark.write.mode("overwrite").parquet("/content/drive/MyDrive/Data_Preprocessing/training.parquet")
test_df_spark.write.mode("overwrite").parquet("/content/drive/MyDrive/Data_Preprocessing/testing.parquet")

In [None]:
# convert to pandas dataframe for model fitting
train_df = pd.read_parquet("/content/drive/MyDrive/Data_Preprocessing/training.parquet")
test_df = pd.read_parquet("/content/drive/MyDrive/Data_Preprocessing/testing.parquet")

## Model With Scikit-Learn (Hybrid Model)

**Collaborative Filtering (SVD) with Surprise:**

This part primarily uses the Surprise library for collaborative filtering with SVD. Since Surprise is well-suited for collaborative filtering tasks and we're performing grid search for hyperparameter tuning, there's no strong reason to switch this part to Spark.

In [None]:
collab_features = ['user_id', 'business_id']
content_features = ['cleaned_reviews', 'sentiment_score']
target = 'rating'
reader = Reader(rating_scale=(1, 5))

# Load train and test data into the Dataset format
train_data = Dataset.load_from_df(train_df[['user_id', 'business_id', 'rating']], reader)
test_data = Dataset.load_from_df(test_df[['user_id', 'business_id', 'rating']], reader)

# Specify the parameter grid for GridSearchCV
param_grid = {'n_epochs': [10, 20, 30], 'lr_all': [0.002, 0.005, 0.01], 'reg_all': [0.1, 0.4]}

# Create GridSearchCV with the correct metric 'rmse'
gs = GridSearchCV(SVD, param_grid, measures=['rmse'], cv=5)
gs.fit(train_data)  # Fit only on the trainset

# Get the best RMSE score and the corresponding parameters
best_rmse = gs.best_score['rmse']
best_params = gs.best_params['rmse']

# Instantiate the SVD model with the best parameters
collab_model = SVDpp(n_epochs=best_params['n_epochs'], lr_all=best_params['lr_all'], reg_all=best_params['reg_all'])

# Fit collab_model on the trainset
trainset = train_data.build_full_trainset()  # Build full trainset
collab_model.fit(trainset)

<surprise.prediction_algorithms.matrix_factorization.SVDpp at 0x79065cfe7700>

In [None]:
print('Best RMSE: {:.4f}'.format(best_rmse))
print('Best Parameters:', best_params)

Best RMSE: 0.8176
Best Parameters: {'n_epochs': 20, 'lr_all': 0.01, 'reg_all': 0.4}


**Content-Based Model (TF-IDF and Ridge Regression):**

This part involves using scikit-learn's pipeline and grid search for hyperparameter tuning with TF-IDF and Ridge Regression. Since these are common machine learning tasks, we choose to keep this part as is unless it is a very large dataset where Spark's distributed capabilities could be advantageous.

In [None]:
from sklearn.pipeline import Pipeline # there are two pipelines, spark and sklearn

# Data splitting
X_train, y_train = train_df[['cleaned_reviews', 'sentiment_score']], train_df['rating']
X_test, y_test = test_df[['cleaned_reviews', 'sentiment_score']], test_df['rating']

# Preprocessing pipeline
preprocessor = ColumnTransformer(
    transformers=[
        ('text', TfidfVectorizer(), 'cleaned_reviews'),
        ('numeric', 'passthrough', ['sentiment_score'])
    ]
)

# Full pipeline with Ridge regression
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('ridge', Ridge(random_state=42))
])

# Hyperparameter grid for GridSearchCV
param_grid = {
    'preprocessor__text__max_features': [500, 1000, 1500],
    'preprocessor__text__ngram_range': [(1, 1), (1, 2)],
    'ridge__alpha': [0.1, 1.0, 10.0]
}

# GridSearchCV for hyperparameter tuning
grid_search = sk_GridSearchCV(pipeline, param_grid, scoring='neg_mean_squared_error', cv=5, verbose=1, n_jobs=-1)

# Model training and hyperparameter tuning
grid_search.fit(X_train, y_train)

# Best hyperparameters and model
best_params = grid_search.best_params_
best_score = -grid_search.best_score_  # Convert to positive mean squared error
content_model = grid_search.best_estimator_
content_model.fit(X_train, y_train)


Fitting 5 folds for each of 18 candidates, totalling 90 fits


In [None]:
print(f'Best parameters: {best_params}')
print(f'Best score (MSE): {best_score}')

Best parameters: {'preprocessor__text__max_features': 1500, 'preprocessor__text__ngram_range': (1, 1), 'ridge__alpha': 10.0}
Best score (MSE): 0.3687917954347598


**Combining Predictions and Calculating RMSE:**

This part combines predictions from both models and calculates the final RMSE. Since it involves working with Pandas and NumPy arrays, and the computations are not distributed.

In summary, if the dataset is large and we need to scale collaborative filtering or content-based modeling tasks, we might consider using Spark for those parts. However, for smaller datasets, the current implementation using scikit-learn and Surprise libraries should be sufficient and straightforward. It's a trade-off between scalability and simplicity.

In [None]:
# Assuming you have a collab_model and content_model defined
testset = test_data.build_full_trainset().build_testset()
# Collaborative Filtering Model Predictions
col_predictions = collab_model.test(testset)

# Extract estimated ratings
col_predictions = [pred.est for pred in col_predictions]

# Assuming you have a content-based model (content_model) defined
con_predictions = np.array(content_model.predict(X_test))

# Combine Predictions
final_predictions = np.mean([col_predictions, con_predictions], axis=0)

# Calculate RMSE
final_rmse = np.sqrt(mean_squared_error(y_test, final_predictions))
print(f'Final RMSE: {final_rmse}')

Final RMSE: 0.6811783847706706


## Modelling with Spark (ALS)

In [None]:
# tokenizer
tokenizer = Tokenizer(inputCol="cleaned_reviews", outputCol="words")
tokenized_df = tokenizer.transform(train_df_spark)

# Word2Vec model
word2Vec = Word2Vec(vectorSize=130, minCount=0, inputCol="words", outputCol="word2vec_features")
word2Vec_model = word2Vec.fit(tokenized_df)
train_word2vec_df = word2Vec_model.transform(tokenized_df)

In [None]:
# Add unique IDs to each row
train_word2vec_df = train_word2vec_df.withColumn("id", monotonically_increasing_id())

# Self-join to calculate similarities
joined_df = train_word2vec_df.alias("df1").join(train_word2vec_df.alias("df2"), "user_id").filter("df1.id < df2.id")

def cosine_similarity(vec1, vec2):
    # Convert to NumPy arrays
    vec1_array = np.array(vec1.toArray())
    vec2_array = np.array(vec2.toArray())

    return float(np.dot(vec1_array, vec2_array) / (np.linalg.norm(vec1_array) * np.linalg.norm(vec2_array)))

cosine_similarity_udf = udf(cosine_similarity, FloatType())

# Apply the UDF
similarity_df = joined_df.withColumn("similarity", cosine_similarity_udf("df1.word2vec_features", "df2.word2vec_features"))


In [None]:
# Normalize the similarity scores
max_score = similarity_df.agg(max(col("similarity")).alias("max_score")).collect()[0]["max_score"]
min_score = similarity_df.agg(min(col("similarity")).alias("min_score")).collect()[0]["min_score"]

similarity_df = similarity_df.withColumn("normalized_similarity",
                                         (col("similarity") - min_score) / (max_score - min_score))
global_avg_similarity = similarity_df.agg({"normalized_similarity": "avg"}).collect()[0][0]


In [None]:
# Fill missing values in user profiles with the global average
user_profile_df = similarity_df.groupBy("user_id").agg({"normalized_similarity": "avg"}) \
                                .na.fill(global_avg_similarity)

# Rename the column for clarity
user_profile_df = user_profile_df.withColumnRenamed("avg(normalized_similarity)", "user_similarity_avg")


In [None]:
# Join user profile data with the original reviews dataset
enriched_df = train_df_spark.join(user_profile_df, train_df_spark["user_id"] == user_profile_df["user_id"])

# Aggregate user profile information for each restaurant
restaurant_profile_df = enriched_df.groupBy("business_id").agg({"user_similarity_avg": "avg"})

# Rename the column for clarity and fill missing values
restaurant_profile_df = restaurant_profile_df.withColumnRenamed("avg(user_similarity_avg)", "restaurant_similarity_profile") \
                                             .na.fill(global_avg_similarity)


In [None]:
als_model_df = train_df_spark.join(user_profile_df, "user_id").join(restaurant_profile_df, "business_id")

In [None]:
#### DON RUN THIS, THE MODEL HAS BEEN TRAINED ALREADY , THIS WILL TAKE TOO LONG TO FINISH

als_model_df = als_model_df.withColumn("rating", col("rating").cast("float"))

# Define the ALS model with nonnegative constraint and coldStartStrategy
als = ALS(userCol="user_id_indexed", itemCol="business_id_indexed", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)

# Adjust the ParamGridBuilder with a wider range of regParam and potentially lower ranks
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [5, 10, 20]).addGrid(als.maxIter, [10, 15]).addGrid(als.regParam, [0.05, 0.1, 0.2]).build()

# Define an evaluator with RMSE metric
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# CrossValidator with 5-fold cross-validation
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Fit the model
model = cv.fit(als_model_df)

# Extract the best model
best_model = model.bestModel

# Optional: Print out the best parameters
print("Best rank:", best_model.rank)
print("Best maxIter:", best_model._java_obj.parent().getMaxIter())
print("Best regParam:", best_model._java_obj.parent().getRegParam())

model_path = "/content/drive/MyDrive/cs631alsmodels_new"
best_model.write().overwrite().save(model_path)

In [None]:
from pyspark.ml.recommendation import ALSModel
model_path = "/content/drive/MyDrive/cs631alsmodels_new"
als_model = ALSModel.load(model_path)

In [None]:
test_model_df = test_df_spark.withColumn("rating", col("rating").cast("float"))
predictions = als_model.transform(test_model_df)
predictions = predictions.na.drop(subset=["prediction"])
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.3283079852708082


## Generating recommender function
For the followig parts, I experimented with both Spark and Pandas implementations. But I observed similar performance speeds in both cases. The reasons could be:
1. Extensive Use of UDFs (User-Defined Functions): The content and collaborative models are derived from different frameworks. To integrate these models effectively, numerous UDFs were required. UDFs in Spark, while flexible, can be less efficient than built-in Spark functions due to the added serialization and deserialization overhead, especially when dealing with complex operations.

2. Conversions Between Pandas and Spark DataFrames: Despite efforts to minimize conversions, some degree of data transfer between Pandas and Spark DataFrames was inevitable. These conversions are computationally costly, as they involve serialization and deserialization of data, leading to additional processing time that could negate some of Spark's performance advantages.




In [None]:
# how to handle the deployment problem?

agg_reviews = train_df.groupby('business_id')['cleaned_reviews'].apply(' '.join)
agg_sentiments = train_df.groupby('business_id')['sentiment_score'].mean()


def predict_collaborative_filtering(collab_model, user_id, all_restaurant_ids, rating_threshold=4.5):
    predictions = []
    for restaurant_id in all_restaurant_ids:
        # Predict the rating for each restaurant
        prediction = collab_model.predict(user_id, restaurant_id)
        if prediction.est > rating_threshold:
            predictions.append((restaurant_id, prediction.est))

    return predictions


def refine_with_content_model(user_id, potential_restaurants, content_model, restaurant_dict, batch_size=50):
    refined_recommendations = []
    batch_data = []
    batch_restaurants = []

    for restaurant_id, collab_pred_rating in potential_restaurants:
        restaurant_data = restaurant_dict.get(restaurant_id)

        if restaurant_data is not None and not restaurant_data.empty:
            batch_data.append(restaurant_data)
            batch_restaurants.append((restaurant_id, collab_pred_rating))

        # Process the batch when the batch size is reached or at the end of the list
        if len(batch_data) == batch_size or restaurant_id == potential_restaurants[-1][0]:
            if batch_data:
                # Concatenate the batch data
                batch_df = pd.concat(batch_data, ignore_index=True)

                # Predict using the content model for the batch
                batch_predictions = content_model.predict(batch_df)

                # Iterate over the batch predictions and combine with collaborative ratings
                for (restaurant_id, collab_pred_rating), content_pred_rating in zip(batch_restaurants, batch_predictions):
                    final_rating = (collab_pred_rating*0.5 + content_pred_rating*0.5)
                    refined_recommendations.append((restaurant_id, final_rating))

            # Reset batch data and restaurants list for the next batch
            batch_data = []
            batch_restaurants = []

    return refined_recommendations


def get_top_rated_restaurant_per_user(train_df):
    # Group by user and aggregate with a custom function to find the top-rated restaurant
    def get_top_restaurant(group):
        top_rated = group.sort_values(by='rating', ascending=False).head(1)
        return top_rated['business_id'].values[0]

    top_rated_restaurants = train_df.groupby('user_id').apply(get_top_restaurant)

    return top_rated_restaurants.to_dict()


def get_similar_restaurants(business_id, transformed_df, mh_model):
    key = transformed_df.filter(transformed_df['business_id'] == business_id).head()
    if key:
        similar = mh_model.approxSimilarityJoin(transformed_df, transformed_df, 1.0, distCol="JaccardDistance") \
            .filter(col("datasetA.business_id") == key['business_id']) \
            .select(col("datasetB.business_id").alias("similar_business_id"), col("JaccardDistance")) \
            .dropDuplicates(["similar_business_id"]) \
            .orderBy("JaccardDistance", ascending=True)

        # Convert to Pandas DataFrame and then to dictionary
        similar_pd = similar.toPandas()
        similarity_dict = dict(zip(similar_pd['similar_business_id'], similar_pd['JaccardDistance']))
        return similarity_dict
    else:
        print("Business ID not found in the dataset.")
        return None

# Preprocess the data once
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=30000)
cv_model = cv.fit(tokenized_df)
featurized_df = cv_model.transform(tokenized_df)

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
mh_model = mh.fit(featurized_df)
transformed_df = mh_model.transform(featurized_df)
transformed_df.cache()

DataFrame[business_id: string, user_id: string, rating: string, cleaned_reviews: string, sentiment_score: double, user_id_indexed: double, business_id_indexed: double, words: array<string>, features: vector, hashes: array<vector>]

In [None]:
restaurant_dict = {key: group[['cleaned_reviews', 'sentiment_score']] for key, group in train_df.groupby('business_id')}
top_rated_restaurants_all = get_top_rated_restaurant_per_user(train_df)
all_restaurant_ids = train_df['business_id'].unique()

In [None]:
# how to improve precision@k and recall@k?

# train_df['rating'] = train_df['rating'].astype('float')

# # Calculate C, the mean rating across all restaurants
# C = train_df['rating'].mean()

# # Choose a value for m, the minimum reviews required
# m = train_df['business_id'].value_counts().quantile(0.90)

# # Calculate v (number of reviews) and R (average rating) for each restaurant
# v = train_df.groupby('business_id')['rating'].count()
# R = train_df.groupby('business_id')['rating'].mean()

# # Calculate weighted rating
# weighted_rating = (v / (v + m)) * R + (m / (v + m)) * C

# # Convert to a dictionary
# item_popularity = weighted_rating.to_dict()

def re_rank_recommendations(user_id, refined_recommendations, top_rated_restaurants_all,
                            transformed_df, mh_model, alpha=0.2):
    if alpha == 0:
      return refined_recommendations

    re_ranked_recommendations = []

    top_rated_restaurant = top_rated_restaurants_all[user_id]

    # Get similarity scores for the top-rated restaurant
    similarity_scores = get_similar_restaurants(top_rated_restaurant, transformed_df, mh_model)


    for restaurant_id, combined_rating in refined_recommendations:
        similarity_score = similarity_scores.get(restaurant_id, 0)  # Default to 0 if no similarity score

        # Adjust the final score to include similarity
        final_score = (1 - alpha) * combined_rating + (alpha) *  (1 - similarity_score)
        re_ranked_recommendations.append((restaurant_id, final_score))

    return re_ranked_recommendations


In [None]:
def get_als_recommendations(user_id, indexer_model, als_model, k=100):
    # Convert user_id to user_id_indexed
    user_df = spark.createDataFrame([Row(user_id=user_id)])
    indexed_user_df = indexer_model.transform(user_df)
    user_id_indexed = indexed_user_df.select("user_id_indexed").collect()[0]["user_id_indexed"]

    # Get recommendations for the user
    recs = als_model.recommendForUserSubset(spark.createDataFrame([Row(user_id_indexed=user_id_indexed)]), k)

    # Extract the StringIndexerModel for business_id from the indexer_model pipeline
    business_id_indexer_model = [stage for stage in indexer_model.stages if isinstance(stage, StringIndexerModel) and stage.getOutputCol() == "business_id_indexed"][0]

    # Create an IndexToString transformer
    converter = IndexToString(inputCol="business_id_indexed", outputCol="business_id", labels=business_id_indexer_model.labels)

    # Transform the recommendations DataFrame to extract business_id_indexed and convert them to original business IDs
    recs_transformed = recs.select("user_id_indexed", explode("recommendations").alias("recommendation"))
    business_ids_df = recs_transformed.select("user_id_indexed", col("recommendation.business_id_indexed").alias("business_id_indexed"))

    original_ids_df = converter.transform(business_ids_df)

    als_rankings = {row['business_id']: idx + 1 for idx, row in enumerate(original_ids_df.collect())}
    return als_rankings


def precompute_most_popular_items(train_df, k=20):
    # Calculate popularity as number of visits * average rating
    train_df['rating'] = train_df['rating'].astype('float')
    popularity_df = train_df.groupby('business_id').agg(
        num_visits=('business_id', 'count'),
        avg_rating=('rating', 'mean')).reset_index()
    popularity_df['popularity_score'] = popularity_df['num_visits'] * popularity_df['avg_rating']

    # Sort and get top k most popular business IDs
    most_popular_business_ids = popularity_df.sort_values('popularity_score', ascending=False)['business_id'][:k].tolist()
    return most_popular_business_ids

def generate_k_recommendations(user_id, all_restaurant_ids, indexer_model, als_model, collab_model, content_model,
                               train_df, restaurant_dict, transformed_df, mh_model, most_popular_business_ids,
                               k=20, rating_threshold=4.5, alpha=0.2, include_als=False):

    if user_id not in train_df['user_id'].unique():
      # Cold-start scenario: Recommend most popular items
      return most_popular_business_ids[:k]

    if not include_als:
        # Get collaborative filtering recommendations
        potential_restaurants = predict_collaborative_filtering(collab_model, user_id, all_restaurant_ids, rating_threshold)
        # Refine recommendations using the content model
        content_refined_recommendations = refine_with_content_model(user_id, potential_restaurants, content_model, restaurant_dict, batch_size=1000)
        # Use re_rank_recommendations to get refined recommendations
        re_ranked_recommendations = re_rank_recommendations(user_id, content_refined_recommendations, top_rated_restaurants_all, transformed_df, mh_model, alpha)
        return re_ranked_recommendations[:k]

    # Get ALS model recommendations with rankings
    als_rankings = get_als_recommendations(user_id, indexer_model, als_model, 50)

    # Get collaborative filtering recommendations
    potential_restaurants =  predict_collaborative_filtering(collab_model, user_id, all_restaurant_ids, rating_threshold)
    #print(collab_recommendations)

    # Refine recommendations using the content model
    content_refined_recommendations = refine_with_content_model(user_id, potential_restaurants, content_model, restaurant_dict, batch_size=1000)
    #print(collab_recommendations)
    # Use re_rank_recommendations to get refined recommendations
    re_ranked_recommendations = re_rank_recommendations(user_id, content_refined_recommendations, top_rated_restaurants_all, transformed_df, mh_model, alpha)

    # Incorporate ALS rankings into the final score
    final_recommendations = []
    for restaurant_id, combined_score in re_ranked_recommendations:
        als_rank = als_rankings.get(restaurant_id, 51)  # Default to a low rank if not in top 50
        final_score = combined_score / als_rank  # Higher ALS rank results in a higher final score
        final_recommendations.append((restaurant_id, final_score))

    # Sort by final score and return top k recommendations
    final_recommendations.sort(key=lambda x: x[1], reverse=True)
    return final_recommendations[:k]





In [None]:
# Precompute the list once
most_popular_business_ids = precompute_most_popular_items(train_df)
most_popular_business_ids

['604245d6b9a6829e686e8c2a',
 '6043ad17b81264dfa846c9ea',
 '60415f44c6fcf1fddba13088',
 '604153f5c6fcf1fddba12bd2',
 '604242b42e57ebdea29c91f6',
 '604158f57dfa7f1871835dd4',
 '6056030897d555cc6fb0d03b',
 '604bdd2a21f213251c55e437',
 '60424db77dfa7f187183bd4d',
 '60458535af942d7ea319b5f0',
 '604162547cd8bf1303625275',
 '60418e07c6fcf1fddba13f2f',
 '604156f62e57ebdea29c37dc',
 '60418a3b2e57ebdea29c4f21',
 '6040ce24c6fcf1fddba0f0e0',
 '6040c0517cd8bf130361fe41',
 '604222e85fd99145bc6227dc',
 '604c0a7c21f213251c55e58a',
 '6040fe28c6fcf1fddba108dd',
 '604159ca2e57ebdea29c39a3']

## Experiments

In [None]:
user_id = '111581047256694238215'
get_als_recommendations(user_id, indexer_model, als_model, k=20)

In [None]:
generate_k_recommendations(user_id, all_restaurant_ids, indexer_model, als_model, collab_model, content_model,
                               train_df, restaurant_dict, item_popularity, transformed_df, mh_model, most_popular_business_ids,
                               k=20, rating_threshold=4.5, alpha=0.3, beta=0.2)

[]
[]


[]

In [None]:
test_df[test_df['user_id'] == user_id]

Unnamed: 0,business_id,user_id,rating,cleaned_reviews,sentiment_score,user_id_indexed,business_id_indexed
2551,605687e3f69c7b117807008c,115222930312185568212,4.0,wife salad czech caprese salad version main co...,0.999994,7710.0,2637.0


In [None]:
train_df[train_df['user_id'] == user_id]

Unnamed: 0,business_id,user_id,rating,cleaned_reviews,sentiment_score,user_id_indexed,business_id_indexed
27095,604a88b0b1a0aaee3eefbc99,105638162784132957228,2.0,granted breakfast screw country fried steak eg...,0.000309,12924.0,18195.0


In [None]:
als_rankings = get_als_recommendations(user_id, indexer_model, als_model, 50)
# collab_recommendations = predict_collaborative_filtering(collab_model, user_id, all_restaurant_ids, 1)
# content_refined_recommendations = refine_with_content_model(user_id, collab_recommendations, content_model, restaurant_dict, batch_size=1000)
# refined_recommendations = re_rank_recommendations(user_id, content_refined_recommendations, item_popularity, top_rated_restaurants_all, transformed_df, mh_model, 0.3, 0)
# final_recommendations = []
# for restaurant_id, combined_score in refined_recommendations:
#     als_rank = als_rankings.get(restaurant_id, 51)  # Default to a low rank if not in top 50
#     final_score = combined_score / als_rank  # Higher ALS rank results in a higher final score
#     final_recommendations.append((restaurant_id, final_score))

# Sort by final score and return top k recommendations
final_recommendations.sort(key=lambda x: x[1], reverse=True)
final_recommendations = final_recommendations[:50]

In [None]:
# Value to check
value_to_check = '604a88b0b1a0aaee3eefbc99'

# Function to find the tuple containing the value
def find_tuple_with_value(tuples_list, value):
    for item in tuples_list:
        if value in item:
            return item
    return None  # Return None if the value is not found in any tuple

# Find the tuple
result = find_tuple_with_value(final_recommendations, value_to_check)

if result:
    print(f"Found the value in tuple: {result}")
else:
    print("Value not found in any tuple.")


Value not found in any tuple.


## Performance

In [None]:
def precision_recall_at_k(content_model, collab_model, als_model, test_df, test_df_spark, all_restaurant_ids,
                          indexer_model, transformed_df, mh_model, most_popular_business_ids,
                          k=10, threshold=4, content=False, collab=True, als=False, alpha=0.3, use_min_hash=False):
    '''Return precision and recall at k metrics for each user.'''
    user_est_true = defaultdict(list)
    remove_threshold = False # indicate whether to remove the threshold for the estimated predictions
    if content and not collab:
        # Generate predictions from the content model
        X_test = test_df[['cleaned_reviews', 'sentiment_score']]
        predictions = content_model.predict(X_test)
        for uid, true_r, est in zip(test_df['user_id'], test_df['rating'], predictions):
            user_est_true[uid].append((est,float(true_r)))

    if collab and not content:
        # Generate predictions from the collaborative model
        test_data = Dataset.load_from_df(test_df[['user_id', 'business_id', 'rating']], reader)
        testset = test_data.build_full_trainset().build_testset()
        predictions = collab_model.test(testset)
        for uid, _, true_r, est, _ in predictions:
            user_est_true[uid].append((est, float(true_r)))

    if content and collab and not als:
        if not use_min_hash:
            # Generate predictions from both models and combine them
            # take 30 samples
            X_test = test_df[['cleaned_reviews', 'sentiment_score']]
            content_predictions = content_model.predict(X_test)
            test_data = Dataset.load_from_df(test_df[['user_id', 'business_id', 'rating']], reader)
            testset = test_data.build_full_trainset().build_testset()
            collab_predictions = collab_model.test(testset)
            for uid, true_r, est_content, est_collab in zip(test_df['user_id'], test_df['rating'], content_predictions, [pred.est for pred in collab_predictions]):
                combined_est = 0.5 * est_content + 0.5 * est_collab
                user_est_true[uid].append((combined_est, float(true_r)))
        else:
            remove_threshold = True
            unique_users_train = set(train_df['user_id'].unique())
            unique_businesses_train = set(train_df['business_id'].unique())
            filtered_test_df = test_df[test_df['user_id'].isin(unique_users_train) & test_df['business_id'].isin(unique_businesses_train)]

            X_test = filtered_test_df[['cleaned_reviews', 'sentiment_score']]
            content_predictions = content_model.predict(X_test)
            test_data = Dataset.load_from_df(filtered_test_df[['user_id', 'business_id', 'rating']], reader)
            testset = test_data.build_full_trainset().build_testset()
            collab_predictions = collab_model.test(testset)
            temp_dict = defaultdict(list)
            for (uid, restaurant_id, true_r), est_content, est_collab in zip(filtered_test_df[['user_id', 'business_id', 'rating']].itertuples(index=False), content_predictions, [pred.est for pred in collab_predictions]):
                combined_est = 0.5 * est_content + 0.5 * est_collab
                temp_dict[uid].append((restaurant_id, combined_est, float(true_r)))

            for uid, ratings in temp_dict.items():
                top_rated_restaurant = top_rated_restaurants_all.get(uid)
                if top_rated_restaurant:
                    similarity_scores = get_similar_restaurants(top_rated_restaurant, transformed_df, mh_model)

                    for i, (restaurant_id, combined_rating, true_r) in enumerate(ratings):
                        similarity_score = similarity_scores.get(restaurant_id, 0)  # Default to 0 if no similarity score

                        # Adjust the final score to include similarity
                        final_score = (1 - alpha) * combined_rating + (alpha) *  (1 - similarity_score)

                        # Update the list with the new combined rating
                        ratings[i] = (final_score, true_r)
                # Update the dictionary with the new ratings
                user_est_true[uid] = ratings



    if als and not content and not collab:
        remove_threshold = True
        test_model_df = test_df_spark.withColumn("rating", col("rating").cast("float"))
        # Generate predictions from the ALS model
        predictions = als_model.transform(test_df_spark)
        predictions = predictions.na.drop(subset=["prediction"])
        for row in predictions.collect():
            uid, true_r, est = row['user_id'], row['rating'], row['prediction']
            true_r = float(true_r) if isinstance(true_r, str) else true_r
            est = float(est) if isinstance(est, str) else est
            user_est_true[uid].append((est, true_r))



    precisions = dict()
    recalls = dict()
    for uid, user_ratings in user_est_true.items():

        # Sort user ratings by estimated value
        user_ratings.sort(key=lambda x: x[0], reverse=True)

        # Number of relevant items
        n_rel = sum((true_r >= threshold) for (_, true_r) in user_ratings)

        if remove_threshold:
            # Number of recommended items in top k
            n_rec_k = sum((est >= 0) for (est, _) in user_ratings[:k])

            # Number of relevant and recommended items in top k
            n_rel_and_rec_k = sum(((true_r >= threshold) and (est >= 0))
                              for (est, true_r) in user_ratings[:k])
        else:
            # Number of recommended items in top k
            n_rec_k = sum((est >= threshold) for (est, _) in user_ratings[:k])

            # Number of relevant and recommended items in top k
            n_rel_and_rec_k = sum(((true_r >= threshold) and (est >= threshold))
                                  for (est, true_r) in user_ratings[:k])

        # Precision@K: Proportion of recommended items that are relevant
        precisions[uid] = n_rel_and_rec_k / n_rec_k if n_rec_k != 0 else 1

        # Recall@K: Proportion of relevant items that are recommended
        recalls[uid] = n_rel_and_rec_k / n_rel if n_rel != 0 else 1

    return precisions, recalls


#### ALS Model


In [None]:
prec_to_ave = []
rec_to_ave = []

precisions, recalls = precision_recall_at_k(content_model, collab_model, als_model, test_df, test_df_spark, all_restaurant_ids,
                          indexer_model, transformed_df, mh_model, most_popular_business_ids,
                          k=10, threshold=4, content=False, collab=False, als=True, alpha=0.3)

# Precision and recall can then be averaged over all users

prec_to_ave.append(sum(prec for prec in precisions.values()) / len(precisions))
rec_to_ave.append(sum(rec for rec in recalls.values()) / len(recalls))


precision_average = sum(prec_to_ave)/len(prec_to_ave)
recall_average = sum(rec_to_ave)/len(prec_to_ave)

print("Precision and Recall averages are {0} and {1}, respectively".format(precision_average, recall_average))

Precision and Recall averages are 0.8817234410194698 and 1.0, respectively


#### Collaborative Model

In [None]:
prec_to_ave = []
rec_to_ave = []

precisions, recalls =  precision_recall_at_k(content_model, collab_model, als_model, test_df, test_df_spark, all_restaurant_ids,
                          indexer_model, transformed_df, mh_model, most_popular_business_ids,
                          k=10, threshold=4, content=False, collab=True, als=False, alpha=0.3)

# Precision and recall can then be averaged over all users

prec_to_ave.append(sum(prec for prec in precisions.values()) / len(precisions))
rec_to_ave.append(sum(rec for rec in recalls.values()) / len(recalls))


precision_average = sum(prec_to_ave)/len(prec_to_ave)
recall_average = sum(rec_to_ave)/len(prec_to_ave)

print("Precision and Recall averages are {0} and {1}, respectively".format(precision_average, recall_average))

Precision and Recall averages are 0.8949618199172772 and 0.9740080602396862, respectively


#### Content Model

In [None]:
prec_to_ave = []
rec_to_ave = []

precisions, recalls = precision_recall_at_k(content_model, collab_model, als_model, test_df, test_df_spark, all_restaurant_ids,
                          indexer_model, transformed_df, mh_model, most_popular_business_ids,
                          k=10, threshold=4, content=True, collab=False, als=False, alpha=0.3)

# Precision and recall can then be averaged over all users

prec_to_ave.append(sum(prec for prec in precisions.values()) / len(precisions))
rec_to_ave.append(sum(rec for rec in recalls.values()) / len(recalls))


precision_average = sum(prec_to_ave)/len(prec_to_ave)
recall_average = sum(rec_to_ave)/len(prec_to_ave)

print("Precision and Recall averages are {0} and {1}, respectively".format(precision_average, recall_average))

Precision and Recall averages are 0.9424919927882072 and 0.9865007954183901, respectively


The precision and recall seems higher for the content model. The test is impractical for evaluating a content model, as it depends on user reviews that are only available post-experience. An effective recommendation system should be capable of generating recommendations with minimal user input, typically just the user ID. To achieve this, integrating both collaborative and content-based models is advisable, leveraging the strengths of each to provide more accurate and timely recommendations.

#### Content + Collaborative (hybird Model)

In [None]:
prec_to_ave = []
rec_to_ave = []

precisions, recalls = precision_recall_at_k(content_model, collab_model, als_model, test_df, test_df_spark, all_restaurant_ids,
                          indexer_model, transformed_df, mh_model, most_popular_business_ids,
                          k=10, threshold=4, content=True, collab=True, als=False, alpha=0)

# Precision and recall can then be averaged over all users

prec_to_ave.append(sum(prec for prec in precisions.values()) / len(precisions))
rec_to_ave.append(sum(rec for rec in recalls.values()) / len(recalls))


precision_average = sum(prec_to_ave)/len(prec_to_ave)
recall_average = sum(rec_to_ave)/len(prec_to_ave)

print("Precision and Recall averages are {0} and {1}, respectively".format(precision_average, recall_average))

Precision and Recall averages are 0.9398832325803378 and 0.9917847067557535, respectively


#### Content + Collaborative (Hybrid Model) + Min-Hash Similarity

In [None]:
prec_to_ave = []
rec_to_ave = []

precisions, recalls = precision_recall_at_k(content_model, collab_model, als_model, test_df, test_df_spark, all_restaurant_ids,
                          indexer_model, transformed_df, mh_model, most_popular_business_ids,
                          k=10, threshold=4, content=True, collab=True, als=False, alpha=0.2, use_min_hash=True)

# Precision and recall can then be averaged over all users

prec_to_ave.append(sum(prec for prec in precisions.values()) / len(precisions))
rec_to_ave.append(sum(rec for rec in recalls.values()) / len(recalls))


precision_average = sum(prec_to_ave)/len(prec_to_ave)
recall_average = sum(rec_to_ave)/len(prec_to_ave)

print("Precision and Recall averages are {0} and {1}, respectively".format(precision_average, recall_average))

Precision and Recall averages are 0.8944723618090452 and 1.0, respectively


Evaluate the common users between train and test dataset.

In [None]:
user_id=train_df['user_id'].unique()
user_id_test=test_df['user_id'].unique()
len(set(user_id_test) & set(user_id))

9479

In [None]:
unique_users_train = set(train_df['user_id'].unique())
unique_businesses_train = set(train_df['business_id'].unique())
filtered_test_df = test_df[test_df['user_id'].isin(unique_users_train) & test_df['business_id'].isin(unique_businesses_train)]


common_user_ids_df = train_df_spark.select('user_id').intersect(test_df_spark.select('user_id'))

# Collecting the user IDs as a list
common_user_ids_list = [row['user_id'] for row in common_user_ids_df.collect()]


## Streaming

In [None]:
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"

# import findspark
# findspark.init()

# from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext


# This function creates SparkContext and StreamingContext
# Do not change this function
# Create a global variable for StreamingContext
ssc = None

# This function creates SparkContext and StreamingContext
def initStreamingContext(sc, batch_duration=1):
    global ssc
    try:
        # Stop the existing StreamingContext if it's not already terminated
        if ssc is not None and not ssc.awaitTerminationOrTimeout(0):
            ssc.stop(stopSparkContext=False)
    except Exception as e:
        print(f"Error stopping existing StreamingContext: {e}")

    # Create a new StreamingContext
    ssc = StreamingContext(sc, batch_duration)
    return ssc

# Call initStreamingContext
ssc = initStreamingContext(sc)

using Spark Streaming to process batches of randomly selected user IDs and generate recommendations for each user in each batch.

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
import random
from datetime import datetime
import time


# Randomly choose 30 user IDs every second and create a DStream
import random

ssc = initStreamingContext(sc)
def get_random_user_ids(df, num_samples=30):
    unique_user_ids = df['user_id'].unique()
    num_unique_users = len(unique_user_ids)

    if num_unique_users < num_samples:
        print(f"Warning: Not enough unique user IDs for sampling. Available unique user IDs: {num_unique_users}")
        num_samples = num_unique_users  # Reduce the number of samples to the number of unique users

    if num_unique_users > 0:
        return random.sample(list(unique_user_ids), num_samples)
    else:
        print("Error: No unique user IDs found.")
        return []

# Use the function to create a DStream
stream = ssc.queueStream([get_random_user_ids(test_df, 30) for _ in range(1000)])

# Function to calculate precision and recall for a batch of user IDs
def generate_recommedations(rdd):
    user_ids = rdd.collect()
    for user_id in user_ids:
        print("Generating recommendations for user {0}".format(user_id))
        # Use the recommendation function to get recommended business IDs
        recommended_business_ids = generate_k_recommendations(user_id, all_restaurant_ids, indexer_model, als_model, collab_model, content_model,
                               train_df, restaurant_dict, transformed_df, mh_model, most_popular_business_ids,
                               k=20, rating_threshold=1, alpha=0, include_als=False)
        print("Recommended business IDs: {0}".format(recommended_business_ids))

# Process each batch of user IDs and calculate metrics
stream.foreachRDD(generate_recommedations)

# Start the Spark Streaming Context
ssc.start()
time.sleep(15)
ssc.stop()


Generating recommendations for user 106234677322088610538
Recommended business IDs: [('604092d865e4ba0588bb3ed5', 4.42293193293108), ('604094167cd8bf130361e4c6', 4.58039938378396), ('6040941b65e4ba0588bb3fc2', 4.427734264403102), ('604094449d953d1f97fa098a', 4.642523839534148), ('604094b365e4ba0588bb4004', 4.472968898546133), ('604094fb9d953d1f97fa09ee', 4.703966710314411), ('6040968fc6fcf1fddba0cf97', 4.6545952669206905), ('6040970365e4ba0588bb40ae', 4.3844996858283105), ('604097397cd8bf130361e5e5', 4.672037353372808), ('604097599d953d1f97fa0ac1', 4.544010693089707), ('6040977bc6fcf1fddba0cfed', 4.390187356938998), ('604098967cd8bf130361e755', 4.472438107193817), ('604098b3c6fcf1fddba0d0ec', 4.398122937307589), ('6040995465e4ba0588bb4214', 4.262226068559842), ('60409a03c6fcf1fddba0d1cb', 4.3030675045777365), ('60409b67c6fcf1fddba0d2f3', 4.655207735899008), ('60409c587cd8bf130361e9c9', 4.369906683020638), ('60409c79c6fcf1fddba0d36c', 4.335488841484212), ('60409d467cd8bf130361ea39', 4.4