In [1]:
#import statements
import numpy as np
import pandas as pd
import operator as op

from pyspark.sql import HiveContext, SparkSession
hive_context = HiveContext(sc)


from pyspark.sql.functions import col, lit
from pyspark.sql.types import *
from pyspark.ml.linalg import *
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, Tokenizer, StringIndexer, CountVectorizer, IDF, Word2Vec
from pyspark.ml import Pipeline

In [2]:
spark = SparkSession.builder.enableHiveSupport().appName('ReadWriteData').getOrCreate()
sc = spark.sparkContext

In [3]:
# connect to HDFS
!hdfs dfs -ls /user/vvenkatesan/final_project/

Found 9 items
drwxr-xr-x   - vvenkatesan vvenkatesan          0 2020-08-06 12:47 /user/vvenkatesan/final_project/bus_cat
drwxr-xr-x   - kleindiek   vvenkatesan          0 2020-08-02 12:03 /user/vvenkatesan/final_project/business_json
-rw-r--r--   3 kleindiek   vvenkatesan  248796774 2020-08-04 14:56 /user/vvenkatesan/final_project/review_subset.json
-rw-r--r--   3 vvenkatesan vvenkatesan  449663480 2020-07-09 18:53 /user/vvenkatesan/final_project/yelp_academic_dataset_checkin.json
-rw-r--r--   3 vvenkatesan vvenkatesan 6325565224 2020-07-09 18:54 /user/vvenkatesan/final_project/yelp_academic_dataset_review.json
-rw-r--r--   3 kleindiek   vvenkatesan  175454655 2020-07-28 17:37 /user/vvenkatesan/final_project/yelp_academic_dataset_tip.csv
-rw-r--r--   3 vvenkatesan vvenkatesan  263489322 2020-07-09 18:55 /user/vvenkatesan/final_project/yelp_academic_dataset_tip.json
-rw-r--r--   3 tianjsha    vvenkatesan 3268069927 2020-08-13 16:56 /user/vvenkatesan/final_project/yelp_academic_dataset_u

## Content-Based Recommendation Engine

### 1) Loading Business and Review Data

In [4]:
df_load = spark.read.csv('/user/vvenkatesan/final_project/bus_cat/')

In [5]:
df_load.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)



In [6]:
df_load.show(5)

+--------------------+----------------+--------------------+
|                 _c0|             _c1|                 _c2|
+--------------------+----------------+--------------------+
|f9NumwFMBDn751xgF...|     Active Life|    Gun/Rifle Ranges|
|Yzvjg0SayhoZgCljU...|Health & Medical|Fitness & Instruc...|
|XNoUzKckATkOD1hP6...|            Pets|        Pet Services|
|6OAZjbxqM5ol29BuH...| Hardware Stores|       Home Services|
|51M2Kk903DFYI6gnB...|   Home Services|            Plumbing|
+--------------------+----------------+--------------------+
only showing top 5 rows



In [7]:
bus_cat = df_load.select(col("_c0").alias("business_id"), col("_c1").alias("cat_primary"), col("_c2").alias("cat_secondary"))
bus_cat.show(5)

+--------------------+----------------+--------------------+
|         business_id|     cat_primary|       cat_secondary|
+--------------------+----------------+--------------------+
|f9NumwFMBDn751xgF...|     Active Life|    Gun/Rifle Ranges|
|Yzvjg0SayhoZgCljU...|Health & Medical|Fitness & Instruc...|
|XNoUzKckATkOD1hP6...|            Pets|        Pet Services|
|6OAZjbxqM5ol29BuH...| Hardware Stores|       Home Services|
|51M2Kk903DFYI6gnB...|   Home Services|            Plumbing|
+--------------------+----------------+--------------------+
only showing top 5 rows



In [8]:
bus = spark.read.csv('/user/vvenkatesan/final_project/yelp_business.csv')
bus = (bus.withColumnRenamed('_c0', 'count').withColumnRenamed('_c1', 'business_id')
     .withColumnRenamed('_c2', 'name').withColumnRenamed('_c3', 'address')
     .withColumnRenamed('_c4', 'city').withColumnRenamed('_c5', 'state')
     .withColumnRenamed('_c6', 'postal_code').withColumnRenamed('_c7', 'latitude')
     .withColumnRenamed('_c8', 'longitude').withColumnRenamed('_c9', 'stars')
     .withColumnRenamed('_c10', 'review_count').withColumnRenamed('_c11', 'is_open')
     .withColumnRenamed('_c12', 'attributes').withColumnRenamed('_c13', 'categories')
     .withColumnRenamed('_c14', 'hours'))

bus = bus.drop("count", "state", "postal_code", "latitude", "longitude", "attributes", "categories", "hours")
bus = bus.filter(bus.business_id != 'business_id')

bus = bus.join(bus_cat, on = ['business_id'], how = 'left')
bus = bus.drop('cat_secondary')
bus.show(5)

+--------------------+--------------------+--------------------+---------------+-----+------------+-------+----------------+
|         business_id|                name|             address|           city|stars|review_count|is_open|     cat_primary|
+--------------------+--------------------+--------------------+---------------+-----+------------+-------+----------------+
|f9NumwFMBDn751xgF...|The Range At Lake...|     10913 Bailey Rd|      Cornelius|  3.5|          36|      1|     Active Life|
|Yzvjg0SayhoZgCljU...|   Carlos Santo, NMD|8880 E Via Linda,...|     Scottsdale|  5.0|           4|      1|Health & Medical|
|XNoUzKckATkOD1hP6...|             Felinus|3554 Rue Notre-Da...|       Montreal|  5.0|           5|      1|            Pets|
|6OAZjbxqM5ol29BuH...|Nevada House of Hose|      1015 Sharp Cir|North Las Vegas|  2.5|           3|      0| Hardware Stores|
|51M2Kk903DFYI6gnB...|USE MY GUY SERVIC...|  4827 E Downing Cir|           Mesa|  4.5|          26|      1|   Home Services|


In [11]:
review = spark.sql('SELECT * FROM big_data_group_2.review')
review.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)



In [12]:
review = review.drop('date', 'useful', 'funny', 'cool')
review.show(5)

+--------------------+--------------------+--------------------+-----+--------------------+
|           review_id|             user_id|         business_id|stars|                text|
+--------------------+--------------------+--------------------+-----+--------------------+
|xQY8N_XvtGbearJ5X...|OwjRMXRC0KyPrIlcj...|-MhfebM0QIsKt87iD...|  2.0|As someone who ha...|
|UmFMZ8PyXZTY2Qcwz...|nIJD_7ZXHq-FX8byP...|lbrU8StCq3yDfr-QM...|  1.0|I am actually hor...|
|LG2ZaYiOgpr2DK_90...|V34qejxNsCbcgD8C0...|HQl28KMwrEKHqhFrr...|  5.0|I love Deagan's. ...|
|i6g_oA9Yf9Y31qt0w...|ofKDkJKXSKZXu5xJN...|5JxlZaqCnk1MnbgRi...|  1.0|Dismal, lukewarm,...|
|6TdNDKywdbjoTkize...|UgMW8bLE0QMJDCkQ1...|IS4cv902ykd8wj1TR...|  4.0|Oh happy day, fin...|
+--------------------+--------------------+--------------------+-----+--------------------+
only showing top 5 rows



In [13]:
review.count()

80211220

#### Build a subset of the data!

In [14]:
review_sub = review.rdd.takeSample(False, 100000, seed = 27)
review_sub = spark.createDataFrame(review_sub)
review_sub.count()

100000

In [15]:
review_sub.show(5)

+--------------------+--------------------+--------------------+-----+--------------------+
|           review_id|             user_id|         business_id|stars|                text|
+--------------------+--------------------+--------------------+-----+--------------------+
|XPQ6NgUIvxE6ycS9e...|_axN6Robwz6nl1_kK...|9lH0gzlZB_uXJ1dc9...|  4.0|Food is great as ...|
|UWYAAnpqqTXedglye...|R6NNz2Zb2yqpRWJNK...|eI1ZHGOr2Pus842Kr...|  4.0|love this place.....|
|aIQjht9qwansdQmBX...|YHdHtrN8PucW-HLE3...|c9SKd0bIR6nNzXOVC...|  5.0|If I could, I'd l...|
|R-7BpQat2JhgC6pym...|o7r9Pra-DJgbptutG...|_34XDbs6WW6qYf8BP...|  1.0|There are a lot o...|
|w62EEqn52DfJ-HnzT...|SAYDfmc9X5nTKv7M5...|BH9z7IJ4zydAqgwsb...|  5.0|This is a must se...|
+--------------------+--------------------+--------------------+-----+--------------------+
only showing top 5 rows



In [16]:
# select just the business id and the text of the review

subset_text_only = review_sub.select(col("business_id"), col("text"))
subset_text_only.show(5)

+--------------------+--------------------+
|         business_id|                text|
+--------------------+--------------------+
|9lH0gzlZB_uXJ1dc9...|Food is great as ...|
|eI1ZHGOr2Pus842Kr...|love this place.....|
|c9SKd0bIR6nNzXOVC...|If I could, I'd l...|
|_34XDbs6WW6qYf8BP...|There are a lot o...|
|BH9z7IJ4zydAqgwsb...|This is a must se...|
+--------------------+--------------------+
only showing top 5 rows



In [17]:
# concatenate the review text -- we want to look at all reviews for each business!

# concatentate based on business ID as key. must be in tuple form
subset_conc = subset_text_only.rdd.map(tuple).reduceByKey(op.add)

# create a dataframe of the results to use for tokenization / featurization!
subset_conc_df = spark.createDataFrame(subset_conc).withColumnRenamed('_1', 'business_id').withColumnRenamed('_2', 'text_from_reviews')
subset_conc_df.count()

47250

### 2) Tokenization / Featurization

In [51]:
# build a pipeline

# make each word a token
tokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'text_from_reviews', outputCol = 'tokenized_text')

# remove stopwords like the, etc
stopwords_remove = StopWordsRemover(inputCol = 'tokenized_text', outputCol = 'tokenized_nostopwords')

# vectorize words
word_2_vec = Word2Vec(vectorSize = 100, minCount = 5, inputCol = "tokenized_nostopwords", outputCol = "wordVectors")

pipeline = Pipeline(stages = [tokenizer, stopwords_remove, word_2_vec])

In [52]:
# fit model pipeline
model = pipeline.fit(subset_conc_df)

In [53]:
# transform after fitting
review_subset_tansformed = model.transform(subset_conc_df)

review_subset_tansformed.show(2)

+--------------------+--------------------+--------------------+---------------------+--------------------+
|         business_id|   text_from_reviews|      tokenized_text|tokenized_nostopwords|         wordVectors|
+--------------------+--------------------+--------------------+---------------------+--------------------+
|b3vRI8yXNK34hgC0W...|Have been to this...|[have, been, to, ...| [place, several, ...|[-0.0786991763036...|
|2lQVNwzNo-SbVD5lX...|rude rude and rud...|[rude, rude, and,...| [rude, rude, rude...|[-0.0675602243044...|
+--------------------+--------------------+--------------------+---------------------+--------------------+
only showing top 2 rows



In [55]:
# visualize the calculated vectors. we don't care about intermediate columns anymore and we need to see at least one business
# id for the test case

reviews_vectors = review_subset_tansformed.select('business_id', 'wordVectors')
reviews_vectors = reviews_vectors.rdd.map(lambda x: (x[0], x[1])).collect()
reviews_vectors[0:5]

[('KC2YYwIPYocLEVlpLROaYQ',
  DenseVector([0.0349, -0.0293, -0.0052, -0.033, 0.03, 0.1023, -0.0634, 0.0394, -0.075, -0.0421, 0.0969, 0.004, -0.0867, -0.0399, 0.0091, -0.1737, 0.0511, -0.0215, 0.097, -0.0674, 0.1172, 0.1065, 0.005, -0.0303, 0.0093, 0.03, 0.0736, -0.0434, -0.0497, 0.0169, -0.0548, -0.0491, -0.0352, -0.082, 0.129, -0.1255, -0.0076, 0.0051, -0.1422, -0.0204, -0.1354, 0.0008, 0.108, -0.0951, -0.0208, -0.1003, 0.0536, -0.0025, -0.0396, 0.0678, 0.0142, 0.0244, 0.0447, -0.0714, -0.0873, -0.0874, 0.0483, -0.1227, 0.0215, 0.0767, -0.0028, 0.0498, -0.111, 0.0645, 0.0995, -0.0659, 0.0112, 0.015, -0.0105, -0.0204, -0.0369, 0.0321, 0.0057, 0.06, -0.093, -0.0845, -0.0296, 0.016, -0.008, -0.0222, 0.0825, 0.15, -0.0931, -0.0201, -0.0516, -0.0216, 0.0028, 0.0386, 0.1054, 0.0385, -0.0766, 0.0494, 0.0037, -0.1234, -0.0154, 0.1425, -0.043, -0.0802, 0.0351, -0.106])),
 ('gErcz2Kqc-7E6b_JuU1VKg',
  DenseVector([0.0072, 0.0393, 0.0584, -0.0125, -0.0798, 0.0165, -0.0033, -0.0298, 0.0172, 0.008

### 3) Define Recommendation Functions

We need to be able to identify similar business and reviews. What types of content based recommendation engines can we build?

1. similar business
2. recommend based on what user reviewed perviously
3. recommendation based on a key word search

In [56]:
# define cosine similarity. Cosine similarity is a measurement used to gauge how comparative the records 
# are independent of their size. Computes cosine of the edge between two vectors anticipated in a multi-dimensional space.

def CosineSimilarity(a, b):
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

In [61]:
# define function to compute the cosine similarity of the businesses in our data

def BusinessScore(business_id):
    
    print('-----Compiling Word Vectors-----')
    
    bus_vector = (review_subset_tansformed.select('wordVectors').filter(review_subset_tansformed['business_id'] == business_id)
                  .collect()[0][0])
    
    print('-----Computing Cosine Similarity-----')
    
    bus_vector_rdd = sc.parallelize((i[0], float(CosineSimilarity(bus_vector, i[1]))) for i in reviews_vectors)
    
    # build a dataframe with business ID and cosine similarity as 'score'
    bus_score = spark.createDataFrame(bus_vector_rdd)
    bus_score = (bus_score.withColumnRenamed('_1', 'business_id').withColumnRenamed('_2', 'score')
                               .orderBy(['score'], ascending=[0]))
    
    # we don't want the business id that we're searching to be in the results or NAs either
    bus_score = bus_score.filter(col("business_id") != business_id).filter(col("score") != 'NaN')
    
    print('-----Similar Businesses Found-----')
                
    return bus_score

In [58]:
def BusinessInformation(bus_score):
    
    print('-----Collecting Business Information-----')
    
    selected_ids = bus_score.select('business_id').rdd.flatMap(lambda x: x).collect()
    
    business_subset = bus.where(bus.business_id.isin(selected_ids))
    business_subset = business_subset.join(bus_score, on = ['business_id'], how = 'left')
    
    print('-----Removing Closed Businesses-----')
    
    business_subset = business_subset.filter('is_open == 1')
    business_subset = business_subset.drop('is_open')
    
    print('-----Removing Businesses With Worse Than 3.5 Rating-----')
    
    business_subset = business_subset.filter('stars > 3.0')
    
    print('-----Filtering Based On Category and Location-----')
    
    category = (bus.select("cat_primary").filter(col("business_id") == lit(bus_id))).head()[0]
    city = (bus.select("city").filter(col("business_id") == lit(bus_id))).head()[0]
    
    print('-----Compiling Final Recommendations-----')
    
    trial = (business_subset.select('name', 'address','city','cat_primary','score').filter(business_subset.city == city)
             .filter(business_subset.cat_primary == category).sort("score").orderBy(["score"], ascending=[0])).limit(5)
    
    return trial.toPandas()
    

In [59]:
def InputBusinessInformation(business_id):
    
    print('-----Pulling Initial Business Information-----')
    
    bus_info = (bus.select('name', 'address','city','cat_primary').filter(bus.business_id == business_id))
    bus_info = bus_info.toPandas()
       
    return bus_info

### 4) Test Case!

In [62]:
bus_id = 'mFGUvyJSdSRjIaN94QW_IQ'

test = BusinessInformation(BusinessScore(bus_id))
input_info = InputBusinessInformation(bus_id)

display(input_info, test)

-----Compiling Word Vectors-----
-----Computing Cosine Similarity-----




-----Similar Businesses Found-----
-----Collecting Business Information-----
-----Removing Closed Businesses-----
-----Removing Businesses With Worse Than 3.5 Rating-----
-----Filtering Based On Category and Location-----
-----Compiling Final Recommendations-----
-----Pulling Initial Business Information-----


Unnamed: 0,name,address,city,cat_primary
0,Anaya's Fresh Mexican Restaurant,5830 W Thunderbird Rd,Glendale,Restaurants


Unnamed: 0,name,address,city,cat_primary,score
0,Tacos Tijuana,"4925 W Bell Rd, Ste D-1",Glendale,Restaurants,0.8505
1,Wildflower,17530 N 75th Ave,Glendale,Restaurants,0.81826
2,Polibertos Taco Shop,4310 W Glendale Ave,Glendale,Restaurants,0.810742
3,Saffron Indian Bistro,4330 W Union Hills Dr,Glendale,Restaurants,0.773366
4,Mi Vegana Madre Restaurant,"5835 W Palmaire Ave, Ste E",Glendale,Restaurants,0.771983
