# Part 5 Content Based Filtering

### Setup

In [None]:
from operator import add
from pyspark.sql import SparkSession
from pyspark.ml.feature import *
from pyspark.ml import Pipeline, PipelineModel
import numpy as np
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
spark = SparkSession.builder\
    .config("spark.driver.memory", "32g")\
    .config("spark.executor.memory","32g")\
    .config("spark.driver.maxResultSize","0")\
    .config("spark.sql.autoBroadcastJoinThreshold","-1")\
    .config("spark.sql.broadcastTimeout","1200")\
    .appName("part5").getOrCreate()

In [None]:
business = spark.read.json("yelp_academic_dataset_business.json")
review = spark.read.json("yelp_academic_dataset_review.json")

### Join Review Text by Business ID

In [None]:
reviews_text_rdd = review.select("business_id", "text").rdd
reviews_by_business_rdd = reviews_text_rdd.map(tuple).reduceByKey(add)  
reviews_by_business_df = spark.createDataFrame(reviews_by_business_rdd)
reviews_by_business_df = reviews_by_business_df \
                            .withColumnRenamed("_1", "business_id") \
                            .withColumnRenamed("_2", "text")

### Build Content Based Filtering Pipeline

In [None]:
# prepare the pipeline

regex_tokenizer = RegexTokenizer(gaps = False, pattern = "\w+", inputCol = "text", outputCol = "token")
stop_words_remover = StopWordsRemover(inputCol = "token", outputCol = "non_stop_word")
count_vectorizer = CountVectorizer(inputCol="non_stop_word", outputCol="raw_feature")
idf = IDF(inputCol="rawFeature", outputCol="idf_vector")
word_2_vector = Word2Vec(vectorSize = 100, minCount = 5, inputCol = "non_stop_word", outputCol = "word_vector", seed=1)
vector_assembler = VectorAssembler(inputCols=["idf_vector", "word_vector"], outputCol="combined_vector")

# fit the pipeline

pipeline = Pipeline(stages=[regex_tokenizer, stop_words_remover, count_vectorizer, idf, word_2_vector, vector_assembler])
pipeline_model = pipeline.fit(reviews_by_business_df)

# save the pipeline model

pipeline_model.write().overwrite().save("content_based_pipeline_model")

In [None]:
# load the pipeline model

pipeline_model = PipelineModel.load("content_based_pipeline_model")
reviews_by_business_transformed_df = pipeline_model.transform(reviews_by_business_df)
reviews_by_business_transformed_df.printSchema()
reviews_by_business_transformed_df.take(3)

In [None]:
all_business_vectors = reviews_by_business_transformed_df.select("business_id", "word_vector").rdd.map(lambda x: (x[0], x[1])).collect()
all_business_vectors[1][1]

In [1]:
# utility functions

def cosine_similarity(vector1, vector2): 
    return np.dot(vector1, vector2) / np.sqrt(np.dot(vector1, vector1)) / np.sqrt(np.dot(vector2, vector2)) 


def get_business_details(in_business):
    a = in_business.alias("a")
    b = business.alias("b")
    return a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
             .select([col('a.'+x) for x in a.columns] + [col('b.business_name'),col('b.categories'),
                                                           col('b.stars'),col('b.review_count'),
                                                           col('b.latitude'),col('b.longitude')])
    

In [None]:
def get_similar_business(business_ids, limit=10):
    schema = StructType([   
                            StructField("business_id", StringType(), True)
                            ,StructField("score", IntegerType(), True)
                            ,StructField("input_business_id", StringType(), True)
                        ])
    similar_businesses_df = spark.createDataFrame([], schema)
    
    for b_id in business_ids:
        input_vec = [(r[1]) for r in all_business_vectors if r[0] == b_id][0]
        similar_business_rdd = spark.sparkContext.parallelize(
            (i[0], float(cosine_similarity(input_vec, i[1]))) for i in all_business_vectors)

        similar_business_df = spark.createDataFrame(similar_business_rdd) \
            .withColumnRenamed("_1", "business_id") \
            .withColumnRenamed("_2", "score") \
            .orderBy("score", ascending = False)
            
        similar_business_df = similar_business_df.filter(col("business_id") != b_id).limit(limit)
        similar_business_df = similar_business_df.withColumn("input_business_id", lit(b_id))
        similar_businesses_df = similar_businesses_df \
                                    .union(similar_business_df)
    return similar_businesses_df

# test 
bids = ['Dl2vgi5W_nbe-A97D0zgfA', 'RtUvSWO_UZ8V3Wpj0n077w']

print('\ninput restaurants details:')
business.select('business_id','business_name', 'categories') \
    .filter(business.business_id.isin(bids) == True).show(truncate=False)
    
# get top 10 similar business
similar_business = get_business_details(get_similar_business(bids))

print('Top 10 similar restaurants for each input restaurant are:"')
similar_business.select('input_business_id','business_name', 'score','categories').toPandas()