In [1]:
import pyspark.sql.functions as func
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession, DataFrame, Window
from typing import Optional

In [2]:
spark = (SparkSession.builder
                     .appName('Recommendations')
                     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                     .config("spark.kryoserializer.buffer.max", "2047m")
                     .config("spark.driver.memory", "15g")
                     .getOrCreate()
          )

# Set Hadoop configurations to use the service account JSON key
# sc = spark.sparkContext
# sc._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
# sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")
# sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.json.keyfile", f"../keys/{json_key}")

your 131072x1 screen size is bogus. expect trouble
24/04/25 18:26:38 WARN Utils: Your hostname, mainpc resolves to a loopback address: 127.0.1.1; using 172.25.80.73 instead (on interface eth0)
24/04/25 18:26:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/25 18:26:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
def preprocessing_business(spark,
                           city_name:str ='Philadelphia',
                           category:str = 'restaurant',
                           min_star_rating: Optional[int]=4,
                           min_review_count:int =10):

    df = spark.read.csv('./dataset_business.csv', header=True)
    df_filter = df.filter(
        (func.col('is_open')==1) &
        (func.lower(func.col('city'))== city_name.lower()) &
        (func.lower(func.col('categories')).contains(category.lower())) &
        (func.col('review_count') >= min_review_count)
    )
    if min_star_rating is not None:
        df_filter = df_filter.filter(func.col('stars') >= min_star_rating)

    df_select = df_filter.select(
        func.col('business_id'),
        func.col('categories'),
        func.col('name'),
        func.col('review_count'),
        func.col('stars').alias('business_stars')
    )

    string_indexer = StringIndexer(inputCol='business_id', outputCol='business_id_encode')
    model = string_indexer.fit(df_select)
    city_business_num_id = model.transform(df_select)

    # Convert encoded business_id to integer type
    city_business_num_id = city_business_num_id.withColumn(
        'business_id_encode',
        func.col('business_id_encode').cast(IntegerType())
    )

    return city_business_num_id

In [4]:
def preprocess_review(spark,
                      min_review_count: int = 10,
                      min_star_rating: Optional[int] = None,
                      ) -> DataFrame:

    df_reviews = spark.read.json('./yelp_academic_dataset_review.json')

    # Group by user_id to calculate review counts per user
    df_user_review_counts = df_reviews.groupBy("user_id").agg(
        func.count("review_id").alias("user_review_count")
    ).filter(func.col("user_review_count") >= min_review_count)

    # Join back to the original reviews to filter users by their review counts
    df_filtered_reviews = df_reviews.join(
        df_user_review_counts, "user_id"
    )

    # Optionally filter by star rating
    if min_star_rating is not None:
        df_filtered_reviews = df_filtered_reviews.filter(
            func.col("stars") >= min_star_rating
        )

    # Select and rename columns to match the SQL query
    df_final = df_filtered_reviews.select(
        func.col("user_id"),
        func.col("business_id"),
        func.col("date"),
        func.col("review_id"),
        func.col("stars").alias("user_stars"),
        func.col("text").alias("user_reviews")
    )

    # Encode the user_id using StringIndexer
    string_indexer = StringIndexer(inputCol='user_id', outputCol='user_id_encode')
    model = string_indexer.fit(df_final)
    user_reviews_num_id = model.transform(df_final)

    # Convert encoded user_id to integer type
    user_reviews_num_id = user_reviews_num_id.withColumn(
        'user_id_encode',
        func.col('user_id_encode').cast(IntegerType())
    )

    return user_reviews_num_id

In [5]:
reviews = preprocess_review(spark=spark)
businesses = preprocessing_business(spark=spark)

business_user_review = reviews.join(businesses,
                                    on='business_id',
                                    how='inner')

                                                                                

In [6]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import StopWordsRemover, RegexTokenizer, HashingTF, IDF, Normalizer
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors

In [7]:
def train_test_split(
    df_reviews: DataFrame,
    train_perc: int = 0.7) -> DataFrame:
    
    #Random shuffle
    shuffled_final = df_reviews.orderBy(func.rand(42))

    # Define window specification partitioned by the user_id column and corresponding row number
    window_spec = Window.partitionBy("user_id").orderBy("user_id")
    final_reviews_with_row_number = shuffled_final.withColumn("row_number", func.row_number().over(window_spec))

    # Calculate the total number of rows for each user
    user_counts = final_reviews_with_row_number.groupBy("user_id").count()

    # Calculate the number of rows to include in the training set for each user
    user_test_counts = user_counts.withColumn("test_count", (func.col("count") * (1-train_perc)).cast("int"))

    #Join row number data with test count
    final_with_test_counts = final_reviews_with_row_number.join(user_test_counts, "user_id", "inner")

    # Create training and testing DataFrames for each user
    train_data = final_with_test_counts.filter(func.col("row_number") <= (func.col("count") - func.col("test_count")))
    test_data = final_with_test_counts.filter(func.col("row_number") > (func.col("count") - func.col("test_count")))

    # Drop the intermediate columns
    train_data = train_data.drop("row_number", "count", "test_count")
    test_data = test_data.drop("row_number", "count", "test_count")
    
    return train_data, test_data

In [8]:
(train, test) = train_test_split(df_reviews=business_user_review)

In [9]:
# initial train test datasets size
train.count(), test.count()

                                                                                

(150861, 44853)

In [10]:
# make ALS model
# best rank: 14
# best regParam: 0.19
als = ALS(userCol='user_id_encode',
          itemCol='business_id_encode',
          ratingCol='user_stars',
          coldStartStrategy='drop',
          nonnegative=True,
          rank=14,
          regParam=0.19
          )

evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='user_stars',
    predictionCol='prediction'
)

In [11]:
als_params = ParamGridBuilder().addGrid(als.rank, [12,13,14]) \
                               .addGrid(als.regParam, [0.17,0.18,0.19]) \
                               .build()        
cv = CrossValidator(
        estimator=als,
        estimatorParamMaps=als_params,
        evaluator=evaluator
    )

In [12]:
# model = cv.fit(train)

In [13]:
model = als.fit(train)

24/04/25 18:27:47 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 18:27:49 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 18:27:51 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 18:28:16 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 18:28:30 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 18:28:39 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 18:28:41 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
24/04/25 18:28:42 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
24/04/25 18:28:46 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
24/04/25 18:28:47 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
24/04/25 18:28:47 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
24/04/25 18:28:48 WARN DAGScheduler: Broadcasting larg

In [14]:
predictions = model.transform(test)
rmse = evaluator.evaluate(predictions)
print(rmse)

24/04/25 18:28:58 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
24/04/25 18:28:58 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
24/04/25 18:30:15 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 18:30:38 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 18:30:45 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 18:30:46 WARN DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/04/25 18:30:54 WARN DAGScheduler: Broadcasting large task binary with size 5.9 MiB
                                                                                

1.0145323467387843


In [37]:
user_rec = model.recommendForAllUsers(5)
# https://github.com/apache/spark/blob/master/examples/src/main/python/ml/als_example.py

In [38]:
user_rec.count()

24/04/25 19:03:07 WARN DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/04/25 19:03:14 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
                                                                                

29167

In [39]:
test.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- user_stars: double (nullable = true)
 |-- user_reviews: string (nullable = true)
 |-- user_id_encode: integer (nullable = true)
 |-- categories: string (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: string (nullable = true)
 |-- business_stars: string (nullable = true)
 |-- business_id_encode: integer (nullable = true)



In [40]:
filtered_user_rec = user_rec.withColumn(
    "filtered_recommendations",
    func.expr("filter(recommendations, item -> item.rating > 4.0)")
)

non_empty_user_rec = filtered_user_rec.filter(
    func.size(func.col("filtered_recommendations")) > 0
)
# Display the filtered results
non_empty_user_rec.select("user_id_encode", "filtered_recommendations").show(10, truncate=False)

24/04/25 19:03:29 WARN DAGScheduler: Broadcasting large task binary with size 5.9 MiB

+--------------+--------------------------------------------------------------------------------------------+
|user_id_encode|filtered_recommendations                                                                    |
+--------------+--------------------------------------------------------------------------------------------+
|34            |[{928, 4.7108455}, {1336, 4.666741}, {1248, 4.455908}, {1378, 4.327839}, {1209, 4.302132}]  |
|81            |[{924, 5.427021}, {1336, 5.0612826}, {1044, 5.0513015}, {281, 5.0418916}, {306, 4.99725}]   |
|85            |[{924, 5.2225666}, {1336, 5.1744847}, {577, 4.9288197}, {306, 4.8852763}, {928, 4.8720427}] |
|101           |[{924, 4.3754816}, {1153, 4.1570244}, {1336, 4.0941257}, {1044, 4.0686407}, {205, 4.057283}]|
|211           |[{306, 5.0479345}, {80, 4.727235}, {810, 4.7261477}, {482, 4.695579}, {745, 4.670101}]      |
|321           |[{1336, 5.0151424}, {1248, 4.8526}, {928, 4.796123}, {924, 4.772739}, {1209, 4.686487}]     |
|322      

24/04/25 19:03:43 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
                                                                                

In [44]:
# test.filter((func.col('user_id_encode')==928) & (func.col('business_id_encode')==1209)).show()

24/04/25 19:16:37 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 19:17:19 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 19:18:03 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
24/04/25 19:18:14 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
24/04/25 19:18:17 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
24/04/25 19:18:19 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB

+-------+-----------+----+---------+----------+------------+--------------+----------+----+------------+--------------+------------------+
|user_id|business_id|date|review_id|user_stars|user_reviews|user_id_encode|categories|name|review_count|business_stars|business_id_encode|
+-------+-----------+----+---------+----------+------------+--------------+----------+----+------------+--------------+------------------+
+-------+-----------+----+---------+----------+------------+--------------+----------+----+------------+--------------+------------------+



                                                                                

In [19]:
# preprocess_review(spark=spark).show(5)

In [20]:
# preprocessing_business(spark=spark, min_review_count=3).show(5)

In [21]:
# process text reviews by user
def review_text_processing(df_train: DataFrame,
                           df_test: DataFrame,
                           reivew_col: str = 'user_reviews',
                           rating_cut_off: float = 3.0) -> DataFrame:

    df_train_lower = df_train.withColumn(reivew_col, func.lower(reivew_col))
    df_test_lower = df_test.withColumn(reivew_col, func.lower(reivew_col))
    
    
    # aggregate reviews by business in train
    df_train_agg = df_train_lower.groupBy('business_id', 'name').agg(
        func.concat_ws(' ', func.collect_list('user_reviews')).alias(reivew_col),
        func.mean('business_stars').alias('business_stars'),
    ).filter(
        func.col('business_stars') >= rating_cut_off
    )
    # aggregate reviews by user
    df_test_agg = df_test_lower.groupBy('user_id').agg(
        func.concat_ws(' ', func.collect_list('user_reviews')).alias(reivew_col)
    )
    # Define the regex tokenizer
    regex_tokenizer = RegexTokenizer(
        inputCol=reivew_col,
        outputCol='words',
        pattern="\\W"  # This regex splits the text at any non-word character
    )
    # Define the stopwords remover
    stopwords_remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')


    # tf-idf
    hashing_tf = HashingTF(inputCol='filtered_words',
                           outputCol='raw_features', numFeatures=100)
    idf = IDF(inputCol='raw_features', outputCol='features')
    # Define the pipeline with the stages
    pipeline = Pipeline(stages=[regex_tokenizer, stopwords_remover, hashing_tf, idf])

    # Fit the pipeline to the data and transform the data
    model = pipeline.fit(df_train_agg)
    train_transformed = model.transform(df_train_agg)
    test_transformed = model.transform(df_test_agg)

    # remove extra columns
    train_transformed = train_transformed.drop(
        'date',
        'user_id_encode',
        'categories',
        'review_count',
        'business_id_encode',
        'words',
        'filtered_words',
        'raw_features',
        reivew_col
    )
    test_transformed = test_transformed.drop(
        'date',
        'user_id_encode',
        'categories',
        'review_count',
        'business_id_encode',
        'words',
        'filtered_words',
        'raw_features',
        reivew_col
    )

    for col in train_transformed.columns:
        if col != 'features':
            train_transformed = train_transformed.withColumnRenamed(col, 'train_'+col)

    for col in test_transformed.columns:
        if col != 'features':
            test_transformed = test_transformed.withColumnRenamed(col, 'test_'+col)

    return train_transformed, test_transformed

In [22]:
df_train, df_test = review_text_processing(
                        df_train=train,
                        df_test=test)

                                                                                

In [23]:
# df_train.show(5)

In [24]:
def cosine_similarity(features1, features2):
    return float(float(features1.dot(features2)) / (Vectors.norm(features1, 2) * Vectors.norm(features2, 2)))

cosine_similarity_udf = func.udf(cosine_similarity, FloatType())


def cosine_recommendation(train_transformed: DataFrame,
                          test_transformed: DataFrame,
                          sim_cut_off: float = 0.5) -> DataFrame:

    normalizer = Normalizer(inputCol='features', outputCol='norm_features', p=2.0)
    train_normalized = normalizer.transform(train_transformed).withColumnRenamed("norm_features", "train_norm_features")
    test_normalized = normalizer.transform(test_transformed).withColumnRenamed("norm_features", "test_norm_features")


    # Perform a Cartesian join to calculate cosine similarity between every test and train pair
    cartesian_df = test_normalized.crossJoin(train_normalized)
    
    result_df = cartesian_df.withColumn(
        'similarity',
        cosine_similarity_udf(cartesian_df['test_norm_features'], cartesian_df['train_norm_features'])
    )
    result_df = result_df.filter(
        func.col('similarity') >= sim_cut_off
    )
    return result_df

In [25]:
df_train, df_test = review_text_processing(
                        df_train=train,
                        df_test=test,
                        rating_cut_off=4)

                                                                                

In [26]:
df_test.count()

                                                                                

14266

In [27]:
result_df = cosine_recommendation(train_transformed=df_train, test_transformed=df_test, sim_cut_off=0.7)

In [28]:
result_df.select('test_user_id', 'train_name', 'similarity').show(20)

24/04/25 18:40:50 WARN ExtractPythonUDFFromJoinCondition: The join condition:(cosine_similarity(test_norm_features#2547, train_norm_features#2531)#2651 > 0.7) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.
[Stage 559:>                                                        (0 + 1) / 1]

+--------------------+--------------------+----------+
|        test_user_id|          train_name|similarity|
+--------------------+--------------------+----------+
|-HYCAMf2ml717YD5Y...|IndeBlue Modern I...|0.90363353|
|-HYCAMf2ml717YD5Y...|            Spice 28|0.92052335|
|-HYCAMf2ml717YD5Y...|     World Cafe Live| 0.8825746|
|-HYCAMf2ml717YD5Y...|    Colombian Bakery|0.78020746|
|-HYCAMf2ml717YD5Y...|   Dawson Street Pub| 0.8617899|
|-HYCAMf2ml717YD5Y...|Linda's Vegetaria...|0.80385995|
|-HYCAMf2ml717YD5Y...|     Zorba's Taverna|  0.882695|
|-HYCAMf2ml717YD5Y...|              Martha| 0.8830395|
|-HYCAMf2ml717YD5Y...|      Federal Donuts| 0.7904856|
|-HYCAMf2ml717YD5Y...|Hue Fusion Food M...| 0.7915747|
|-HYCAMf2ml717YD5Y...|M2O Burgers & Salads| 0.8582055|
|-HYCAMf2ml717YD5Y...|Nori Ramen & Poke...| 0.7444716|
|-HYCAMf2ml717YD5Y...|Ambrosia Ristoran...| 0.8726993|
|-HYCAMf2ml717YD5Y...|Prima Pizza Taque...|0.85604244|
|-HYCAMf2ml717YD5Y...| Gran Caffe L'Aquila|0.89863795|
|-HYCAMf2m

                                                                                