# Big Data Platforms - Group Project

## Yelp Graph Network Analysis and Recommender 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import abs, col, to_timestamp, repeat, lit, split,explode
from math import radians, cos, sin, asin, sqrt

In [2]:
spark = SparkSession.builder.enableHiveSupport().appName('Graph').getOrCreate()

sc = spark.sparkContext 

In [3]:
from graphframes import *

## Load and Filter Data

Load perviously filtered Austin businesses dataset along with the business.json dataset for attribute enhacement.

In [4]:
data = spark.read.option("multiLine", "true").csv("big-data/austin_reviews", header = True)
businesses = spark.read.json("big-data/yelp_academic_dataset_business.json")

In [22]:
data.count()

1425227

We'll narrow down our dataset to reviews in Austin, TX from year 2019 to 2021. This will improve recommendation relevance and reduce computational load.

In [5]:
timestamp_type = ["date"]

for c in timestamp_type:
    data = data.withColumn(c,to_timestamp(c, "yyyy-MM-dd HH:mm:ss"))

In [6]:
data = data.filter(data.date > "2019-01-01")

We still have 268,362 reviews present in our dataset. The reviews will represent one type of edge in our graph. We'll build out our edge dataframe later.

In [34]:
data.count()

268362

## Create Node Dataframe

There are two types of nodes in our graph:
    
    1. Businesses
    2. Users
    
Each node type has unique attributes of interest:

- Business: address, categories, hours, name, geocoordinates
- User: for simplicity, we will not track attributes of the user as it's not needed for current use case

Next, we'll find the unique business_ids and user_ids. Each unique id represents a node in our graph. 

In [7]:
business_id_df = data.select("business_id").distinct()
user_id_df = data.select("user_id").distinct()

In [None]:
#data.select("business_id", "address", "categories", "hours", "name", "latitude", "longitude").distinct()

Let's see how many unique users we have:

In [18]:
user_id_df.count()

126744

And unique businesses: 

In [13]:
business_id_df.count()

15834

To help identify the type of node we'll add a new column to each dataframe. A 0 will represent a business type node and a 1 will represnt a user type node. 

In [8]:
#add column type for business and users
business_id_df = business_id_df.withColumn("business", lit(0))
user_id_df = user_id_df.withColumn("customer", lit(1))

Next, we'll select the specific business attributes we want to map to each business type node. We'll do a left join to map these attributes back to our unique business_id dataset. 

In [9]:
#select attributes to add to business type nodes
businesses = businesses.select("business_id", "address", "categories", "hours", "name", "latitude", "longitude")

In [10]:
business_id_w_attributes = business_id_df.join(businesses, on = 'business_id', how = "left")

In [105]:
business_id_w_attributes.count()

15834

Finally, we need to add the business attribute columns to our user_id dataframe. These attributes will be populated with None. By adding these columns to the user_id dataframe, we can do a union to combine our two node type dataframes into a single dataframe. 

In [41]:
user_id_df.show(5)

+--------------------+--------+
|             user_id|customer|
+--------------------+--------+
|x4Y6DZHI9Ad2R-fmm...|       1|
|vJbLbl9cqKwHGARtD...|       1|
|jOhTdcWUVNgwsuIiP...|       1|
|oBUZgRiHQeN6pgDKy...|       1|
|trq73Ax5yXpCRopNC...|       1|
+--------------------+--------+
only showing top 5 rows



In [11]:
cols = ["address", "categories", "hours", "name", "latitude", "longitude"]   

for c in cols:
    user_id_df = user_id_df.withColumn(c, lit(None))

Let's check that our two node type dataframes have the same columns before we do a union. 

In [15]:
user_id_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- customer: integer (nullable = false)
 |-- address: null (nullable = true)
 |-- categories: null (nullable = true)
 |-- hours: null (nullable = true)
 |-- name: null (nullable = true)
 |-- latitude: null (nullable = true)
 |-- longitude: null (nullable = true)



In [16]:
business_id_w_attributes.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- business: integer (nullable = false)
 |-- address: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- hours: struct (nullable = true)
 |    |-- Friday: string (nullable = true)
 |    |-- Monday: string (nullable = true)
 |    |-- Saturday: string (nullable = true)
 |    |-- Sunday: string (nullable = true)
 |    |-- Thursday: string (nullable = true)
 |    |-- Tuesday: string (nullable = true)
 |    |-- Wednesday: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



Let's create a single dataframe that will contain our nodes for our graph. 

In [12]:
#union users and businesses into a node dataframe
node = business_id_w_attributes.union(user_id_df)

In [13]:
#rename columns
node = node.withColumnRenamed("business_id", "id")
node = node.withColumnRenamed("business", "type")

In [31]:
node.show(5)

+--------------------+----+--------------------+--------------------+--------------------+--------------------+-------------+--------------+
|                  id|type|             address|          categories|               hours|                name|     latitude|     longitude|
+--------------------+----+--------------------+--------------------+--------------------+--------------------+-------------+--------------+
|-ZzsPlaAgwO3yt29u...|   0|                    |Professional Serv...|[9:0-17:0, 9:0-17...|      ATX Architects|   30.3911673|   -97.7051165|
|-x9y3f2IXWnWhZyeQ...|   0|      5201 McDade Dr|Arts & Entertainm...|[10:0-19:0, 10:0-...|   Party Boat Austin|    30.255226|    -97.832213|
|1P0gza0EoFe-mQ026...|   0|3201 Bee Caves Rd...|Fitness & Instruc...|[5:15-18:30, 5:15...|F45 Training West...|    30.272058|    -97.800736|
|1m3r_ABHzUvm0H9vQ...|   0|2600 Lake Austin ...|Apartments, Home ...|[9:0-18:0, 0:0-0:...|The Boulevard at ...|    30.280437|    -97.773594|
|2b5S-XLh6gcU

### Create Edges Dataframe

In our graph, there will be two types of edges:

1. User leaves a rating for a business
2. User is connected to another user as a friend

Each edge type will carry with it attributes of interest:

- When a user leaves a rating for a business, we'll want the date of the review and the number of stars. We could also keep additional attributes such as the review text, number of upvotes, ect. We'll only grab the attributes used down stream in our current analysis. 

- A user to user connection as a friend does not have any attribute information available in the dataset. 

To create our edge dataframe, we'll need to pre-process the user.json file, which contains user information (including users friends list). 

Our Austin reviews dataset contains all of the user-business reviews already. We just need to rename appropriate columnas as the source/destination and select the attributes to track with each review. 

In [14]:
#create user to business edges dataframe
user_business = data.select("business_id", "date", "review_id", "text", "stars", "user_id")

user_business = user_business.withColumn("src", user_business["user_id"])
user_business = user_business.withColumn("dst", user_business["business_id"])

user_business = user_business.select('dst', 'date', 'review_id', "text", "stars", "src")

Next, we'll pre-process the user.json file to find user to user edges. 

In [40]:
#put user dataset into HDFS
!hdfs dfs -put /home/aleonard2/yelp_academic_dataset_user.json big-data



In [15]:
users = spark.read.json("big-data/yelp_academic_dataset_user.json")

In [22]:
users.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



We already identfied our unique users above. We'll do a left join on this dataframe with the users.json data, appending the needed "friends" column to each user.

In [16]:
users_edge = user_id_df.join(users, on = 'user_id', how = "left")

In [37]:
users_edge.count()

126744

In [17]:
users_edge = users_edge.select("user_id", "friends")

In [39]:
users_edge.select("user_id","friends").show(1, truncate = False)

+----------------------+----------------------------------------------------------------------------------------------+
|user_id               |friends                                                                                       |
+----------------------+----------------------------------------------------------------------------------------------+
|-0Ji0nOyFe-4yo8BK4aRLA|EmdUN-cFy25hTj2lcD4Ryw, MXaSUovIC5U8d4GYzi2eoQ, c7sqkbx5Y_KXNGsI3i4GYw, gkXdOxE7TxTPIgon21miUg|
+----------------------+----------------------------------------------------------------------------------------------+
only showing top 1 row



Each friend's user_id is seperated by a comma. We'll transform this into an array and explode to get user_id to friend_id for each user to user connection. 

In [18]:
users_friends = users_edge.select("user_id", split(col("friends"),",").alias("FriendArray")) \
    .drop("friends")

In [19]:
#remove users with no friends, explode rest
user_friends_expanded = users_friends.select(users_friends.user_id,explode(users_friends.FriendArray).alias("friend_id"))
user_friends_expanded.show(5, truncate = False)

+----------------------+-----------------------+
|user_id               |friend_id              |
+----------------------+-----------------------+
|-0Ji0nOyFe-4yo8BK4aRLA|EmdUN-cFy25hTj2lcD4Ryw |
|-0Ji0nOyFe-4yo8BK4aRLA| MXaSUovIC5U8d4GYzi2eoQ|
|-0Ji0nOyFe-4yo8BK4aRLA| c7sqkbx5Y_KXNGsI3i4GYw|
|-0Ji0nOyFe-4yo8BK4aRLA| gkXdOxE7TxTPIgon21miUg|
|-25JZ2VjrGZfXR8EaW8vBA|Q-DDwor2okiMIuM5mVewRA |
+----------------------+-----------------------+
only showing top 5 rows



We now have a row for each user to user connection. Finally, we create our source/destination columns and add attributes columns from our user to business edges to allow for union of the two edge dataframes. 

In [20]:
user_friends_expanded= user_friends_expanded.withColumn("src", user_friends_expanded["user_id"])
user_friends_expanded = user_friends_expanded.withColumn("dst", user_friends_expanded["friend_id"])

In [21]:
cols = ["date", "review_id", "text", "stars"]   

for c in cols:
    user_friends_expanded = user_friends_expanded.withColumn(c, lit(None))

In [22]:
#select needed columns and reorder to line up with user-business edge df
user_friends_expanded = user_friends_expanded.select('dst', 'date', 'review_id', "text", "stars", "src")

We'll add a connection type column to each dataframe to label the type of edge: 0 for user to business and 1 for user to user. 

In [23]:
#add type of edge connection
user_business = user_business.withColumn("connection_type", lit(0))
user_friends_expanded = user_friends_expanded.withColumn("connection_type", lit(1))

Let's check that our schemas line up before doing a union.

In [62]:
user_friends_expanded.printSchema()

root
 |-- dst: string (nullable = true)
 |-- date: null (nullable = true)
 |-- review_id: null (nullable = true)
 |-- text: null (nullable = true)
 |-- stars: null (nullable = true)
 |-- src: string (nullable = true)
 |-- connection_type: integer (nullable = false)



In [60]:
user_business.printSchema()

root
 |-- dst: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- review_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- src: string (nullable = true)
 |-- connection_type: integer (nullable = false)



In [24]:
edge_df = user_business.union(user_friends_expanded)

In [63]:
edge_df.dtypes

[('dst', 'string'),
 ('date', 'timestamp'),
 ('review_id', 'string'),
 ('text', 'string'),
 ('stars', 'string'),
 ('src', 'string'),
 ('connection_type', 'int')]

In [77]:
edge_df.show(5)

+--------------------+-------------------+--------------------+--------------------+-----+--------------------+
|                 dst|               date|           review_id|                text|stars|                 src|
+--------------------+-------------------+--------------------+--------------------+-----+--------------------+
|-0wZIJnbYSstEGj3u...|2019-11-14 22:39:24|rrL1-bh6tMNSMKvx5...|This agent doesn'...|  1.0|gjZ-HBBJ2sNKMwL8x...|
|-LqsFXfZDziytsnVq...|2019-02-04 22:35:18|TKlL65VgqGGVpGnvz...|The Purple Fig ha...|  5.0|707raSOaJBZpof1UB...|
|-LqsFXfZDziytsnVq...|2020-12-02 17:29:38|pYAlyWozWYpTA5NqS...|Still loving the ...|  5.0|m98Y2CbHYTjSoPheE...|
|-LqsFXfZDziytsnVq...|2021-01-07 23:13:08|bwGQKkJOOTM4JGx-b...|I have been sitti...|  2.0|WTpA4GEULakoL_NLn...|
|-LqsFXfZDziytsnVq...|2020-09-15 20:07:24|cg2NLaXFycGGGk1VZ...|Id like to start ...|  1.0|8kUlGcK7l2Yc7sKX0...|
+--------------------+-------------------+--------------------+--------------------+-----+--------------

## Create Graph - Basic Analysis

In [25]:
#build Graph - GraphFrame(vertices, edges)
graph = GraphFrame(node, edge_df)

In [81]:
graph.vertices.show(5)

+--------------------+----+--------------------+--------------------+--------------------+--------------------+-------------+--------------+
|                  id|type|             address|          categories|               hours|                name|     latitude|     longitude|
+--------------------+----+--------------------+--------------------+--------------------+--------------------+-------------+--------------+
|-ZzsPlaAgwO3yt29u...|   0|                    |Professional Serv...|[9:0-17:0, 9:0-17...|      ATX Architects|   30.3911673|   -97.7051165|
|-x9y3f2IXWnWhZyeQ...|   0|      5201 McDade Dr|Arts & Entertainm...|[10:0-19:0, 10:0-...|   Party Boat Austin|    30.255226|    -97.832213|
|1P0gza0EoFe-mQ026...|   0|3201 Bee Caves Rd...|Fitness & Instruc...|[5:15-18:30, 5:15...|F45 Training West...|    30.272058|    -97.800736|
|1m3r_ABHzUvm0H9vQ...|   0|2600 Lake Austin ...|Apartments, Home ...|[9:0-18:0, 0:0-0:...|The Boulevard at ...|    30.280437|    -97.773594|
|2b5S-XLh6gcU

In [30]:
graph.edges.show(5)

+--------------------+-------------------+--------------------+--------------------+-----+--------------------+---------------+
|                 dst|               date|           review_id|                text|stars|                 src|connection_type|
+--------------------+-------------------+--------------------+--------------------+-----+--------------------+---------------+
|-0wZIJnbYSstEGj3u...|2019-11-14 22:39:24|rrL1-bh6tMNSMKvx5...|This agent doesn'...|  1.0|gjZ-HBBJ2sNKMwL8x...|              0|
|-LqsFXfZDziytsnVq...|2019-02-04 22:35:18|TKlL65VgqGGVpGnvz...|The Purple Fig ha...|  5.0|707raSOaJBZpof1UB...|              0|
|-LqsFXfZDziytsnVq...|2020-12-02 17:29:38|pYAlyWozWYpTA5NqS...|Still loving the ...|  5.0|m98Y2CbHYTjSoPheE...|              0|
|-LqsFXfZDziytsnVq...|2021-01-07 23:13:08|bwGQKkJOOTM4JGx-b...|I have been sitti...|  2.0|WTpA4GEULakoL_NLn...|              0|
|-LqsFXfZDziytsnVq...|2020-09-15 20:07:24|cg2NLaXFycGGGk1VZ...|Id like to start ...|  1.0|8kUlGcK7l2Yc7s

In [50]:
#How many nodes?
graph.vertices.count()

142578

In [51]:
graph.edges.count()

268362

In [94]:
#Most connected nodes
graph.degrees.orderBy("degree", ascending=False).show(10, truncate = False)

+----------------------+------+
|id                    |degree|
+----------------------+------+
|None                  |54543 |
|hizGc5W1tBHPghM5YKCAtg|9233  |
|djxnI8Ux8ZYQJhiOQkrRhA|8786  |
|3zxy3LVBV3ttxoYbY4rQ8A|7008  |
|NfU0zDaTMEQ4-X9dbQWd9A|6851  |
|8I7DLn7dPu76HQImdoqzQg|6795  |
|3mNz5nQFTIBQm0oU5mBR0w|6778  |
|AHRrG3T1gJpHvtpZ-K0G_g|6612  |
|DOj9NanlJP3xntULCy5Uow|6423  |
|GFyA9ULGAeD-xZEPto2y7A|6250  |
+----------------------+------+
only showing top 10 rows



In [95]:
users.select("user_id", "user_name").filter(users.user_id == "hizGc5W1tBHPghM5YKCAtg").show(truncate = False)

+----------------------+---------+
|user_id               |user_name|
+----------------------+---------+
|hizGc5W1tBHPghM5YKCAtg|Katie    |
+----------------------+---------+



In [86]:
#most connected nodes
connectionCount = graph.edges.groupBy("src", "dst").count().orderBy("count", ascending = False)

connectionCount.show(5, truncate= False)

+----------------------+----------------------+-----+
|src                   |dst                   |count|
+----------------------+----------------------+-----+
|AqzHptL1OCi4ELlXKCfEpg|07vsMfjupodO4am9KE46xQ|12   |
|RpwQrvDNsKcc2WTVR9wtrA|ezOGxXnOE4ZoBnEkDTAvww|8    |
|Lu4-NKrpJbSBpUcZPogovg|u5ztKk_mmNKyhRHBFRgZuQ|8    |
|wLUTJQJL64IPHI55K39Ykw|v0N7mWzaekhyNi0c93JEVg|7    |
|Lu4-NKrpJbSBpUcZPogovg|YDawSup__BY0EekWbA9T6w|7    |
+----------------------+----------------------+-----+
only showing top 5 rows



In [93]:
users.select("user_id", "user_name").filter(users.user_id == "AqzHptL1OCi4ELlXKCfEpg").show(truncate = False)

+--------------------+---------+
|             user_id|user_name|
+--------------------+---------+
|AqzHptL1OCi4ELlXK...| Victoria|
+--------------------+---------+



In [90]:
data.select("business_id", "name").filter(data.business_id == "07vsMfjupodO4am9KE46xQ").show(1, truncate = False)

+----------------------+------------------------------+
|business_id           |name                          |
+----------------------+------------------------------+
|07vsMfjupodO4am9KE46xQ|Waterloo Ice House Burnet Road|
+----------------------+------------------------------+
only showing top 1 row



Victoria has actually reviewed Waterloo Ice House 12 times!

https://www.yelp.com/biz/waterloo-ice-house-burnet-road-austin-4

Attempted to use the pageRank algorithm to find the most influential user, however, RCC kept giving errors when attempting. 

In [97]:
#Run PageRank until convergence to tolerance "tol".
results = graph.pageRank(resetProbability=0.15, tol= 0.01)
results.vertices.orderBy('pagerank',ascending=False).show()

Py4JJavaError: An error occurred while calling o861.run.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 16 in stage 148.0 failed 4 times, most recent failure: Lost task 16.3 in stage 148.0 (TID 6554, hd01.rcc.local, executor 481): java.lang.ClassNotFoundException: org.graphframes.GraphFrame$$anonfun$5
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:88)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2178)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1035)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1017)
	at org.apache.spark.graphx.impl.VertexRDDImpl.count(VertexRDDImpl.scala:90)
	at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:140)
	at org.apache.spark.graphx.lib.PageRank$.runUntilConvergenceWithOptions(PageRank.scala:355)
	at org.graphframes.lib.PageRank$.runUntilConvergence(PageRank.scala:152)
	at org.graphframes.lib.PageRank.run(PageRank.scala:102)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.graphframes.GraphFrame$$anonfun$5
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:88)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [56]:
data.select("business_id", "user_id").show(5, truncate = False)

+----------------------+----------------------+
|business_id           |user_id               |
+----------------------+----------------------+
|-0wZIJnbYSstEGj3uXTmmA|gjZ-HBBJ2sNKMwL8xudRZA|
|-LqsFXfZDziytsnVqbuS2Q|707raSOaJBZpof1UBJ7pOw|
|-LqsFXfZDziytsnVqbuS2Q|m98Y2CbHYTjSoPheElJguQ|
|-LqsFXfZDziytsnVqbuS2Q|WTpA4GEULakoL_NLnimPlA|
|-LqsFXfZDziytsnVqbuS2Q|8kUlGcK7l2Yc7sKX0vXnfw|
+----------------------+----------------------+
only showing top 5 rows



## Build Recommender

We'll now attempt to create a recommender system leveraging our graph. There are two initially obvious routes to achieve:

1. Use the user to business edges to map an input user to other users who had a similar business rating. Then, find the businesses the other users have frequented (and given a high rating to). Recommend these businesses to our input user.
2. Use the user to user edges to map an input user to businesses their friends have given a high rating.

For our recommender, we'll take the 1st path, as our dataset is much more rich in attribute information for the user to businesses edge types.

We will use a motif to traverse our graph for the businesses of interest. Our motif will:

1. connect an input user to all the businesses the input user has reviewed
2. find new users that have also reviewed the businesses the input user has reviewed
3. find all businesses the new users have reviewed

From our motif result, we will take the rating our input user has given each reviewed business and find the absolute difference in rating with all the other users who have reviewed each similar business. The goal is to find users who rated the business the same or almost the same as the input user, the assumption being these users like similar businesses. 

We now have a list of businesses other users have reviewed that we can potentially recommend to our input user. We'll filter out any businesses that our new users gave a rating of 3 or less stars, thus keeping only the businesses reviewed positively. 

Finally, we'll sort our data based on our input user's assumed location for relevance and drop duplicate business recs that may have resulted from multiple new users reviewing the same business. 

#### Train Word2Vec

To make more relevant recommendations, we'll take the business category column and create a word embeddings with Word2Vec. This will allow our recommender to measure similiarty in business types. 

In [26]:
data_no_na = data.dropna(thresh=1,subset=('categories'))

In [27]:
from pyspark.ml.feature import RegexTokenizer

regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'categories', outputCol = 'categories_token')

austin_categories_token = regexTokenizer.transform(data_no_na)
austin_categories_token.show(3)

+--------------------+--------------------+------+--------------------+------------+--------------+--------------------+-------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+
|         business_id|             address|  city|                name|review_count|business_stars|          categories|is_open|cool|               date|funny|           review_id|stars|                text|useful|             user_id|    categories_token|
+--------------------+--------------------+------+--------------------+------------+--------------+--------------------+-------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+
|-0wZIJnbYSstEGj3u...|3810 Medical Pkwy...|Austin|Allstate Insuranc...|           5|           2.0|Insurance, Home &...|      1|   0|2019-11-14 22:39:24|    0|rrL1-bh6tMNSMKvx5...|  1.0|This agent doesn'...|     0|gjZ-HBBJ2sNKMwL

In [28]:
from pyspark.ml.feature import Word2Vec
#create an average word vector for each category column
word2vec = Word2Vec(vectorSize = 100, minCount = 5, inputCol = 'categories_token', outputCol = 'categories_vector')
model = word2vec.fit(austin_categories_token)

#### Graph Motif

In [26]:
input_user = "Lu4-NKrpJbSBpUcZPogovg"
filter_user = ("input_user.id = '{}'".format(input_user))
rating_mult = graph.find("(input_user)-[e]->(input_user_business); (new_user)-[e2]->(input_user_business); (new_user)-[e3]->(new_user_business)").dropDuplicates().filter(filter_user)

In [36]:
#remove cases when input user is same as new user
results = rating_mult.select("input_user", "e.stars", "input_user_business.name", "e2.stars", "new_user", "e3.stars", "new_user_business.name")

In [39]:
results.filter(results.new_user.id != "Lu4-NKrpJbSBpUcZPogovg").show()

+--------------------+-----+--------------------+-----+--------------------+-----+--------------------+
|          input_user|stars|                name|stars|            new_user|stars|                name|
+--------------------+-----+--------------------+-----+--------------------+-----+--------------------+
|[Lu4-NKrpJbSBpUcZ...|  5.0|     Texas Roadhouse|  4.0|[3sI5kFZp8lKWohkH...|  4.0|  Vespaio Ristorante|
|[Lu4-NKrpJbSBpUcZ...|  4.0|P. Terry's Burger...|  3.0|[lya2z8lpqWVGD3u4...|  5.0|Perry’s Steakhous...|
|[Lu4-NKrpJbSBpUcZ...|  5.0|Rudy's "Country S...|  4.0|[I2AM0Xh5clFA3iyF...|  1.0|      Ramen Tatsu-Ya|
|[Lu4-NKrpJbSBpUcZ...|  1.0|Eurasia Sushi Bar...|  5.0|[T6K1U65wS7NtR1QX...|  5.0|      Ramen Tatsu-Ya|
|[Lu4-NKrpJbSBpUcZ...|  3.0|Eurasia Sushi Bar...|  5.0|[Kj_MYdysEwQORXOG...|  1.0|  Sandy's Hamburgers|
|[Lu4-NKrpJbSBpUcZ...|  1.0|      Sonic Drive-In|  3.0|[q3cxC9tv3bmPE74i...|  3.0|          Bert's BBQ|
|[Lu4-NKrpJbSBpUcZ...|  3.0|Alamo Drafthouse ...|  5.0|[t6eNIzTh

The below is a user defined function that uses our motif and applies basic logic to filter down to business to recommend a new user. 

In [61]:
#function for distance calculations
def haversine(lon1, lat1, lon2, lat2):
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 3956 # Radius of earth in miles. Determines return value units.
    return c * r

def get_recs(input_user, user_lat, user_lng):
    filter_user = ("input_user.id = '{}'".format(input_user))
    #find which businesses user rated that rated same business as input user
    rating_mult = graph.find("(input_user)-[e]->(input_user_business); (new_user)-[e2]->(input_user_business); (new_user)-[e3]->(new_user_business)").dropDuplicates().filter(filter_user)
    
    filtered_businesses = rating_mult.select(col("new_user_business.id").alias("business_id"), col("new_user_business.name").alias("business_name"),
    col("new_user_business.categories").alias("categories"), col("new_user_business.latitude").alias("latitude"), 
    col("new_user_business.longitude").alias("longitude"), col("new_user.id").alias("new_user_id"),
    col("e.stars").alias("input_user_rating"),
    col("e2.stars").alias("new_user_rating"),col("e3.stars").alias("new_user_new_business_rating")).withColumn(
    "abs_of_col", abs((col("input_user_rating")-col("new_user_rating")))).filter(col("new_user.id") != input_user)
    
    #save min abs of difference between input user rating and new user rating as variable for filter
    min_abs = filtered_businesses.agg(F.min("abs_of_col")).first()(0)

    #filter out businesses with same input and new user rating
    filtered_businesses = filtered_businesses.filter(filtered_businesses.abs_of_col == min_abs[0])

    #filter out new businesses with reviews greater than 3
    filtered_businesses = filtered_businesses.filter(filtered_businesses.new_user_new_business_rating > 3)
    filtered_businesses = filtered_businesses.dropna()

    austin_categories_token = regexTokenizer.transform(filtered_businesses)

    result = model.transform(austin_categories_token)

    result = result.dropDuplicates(subset=["business_id"])
    
    result = result.drop("categories_token")
    
    filtered_to_pandas = result.toPandas()
    
    filtered_to_pandas_duplicates = filtered_to_pandas.drop_duplicates(subset="business_id")
    
    distance_from = []
    for idx in range(len(filtered_to_pandas_duplicates)):
        lon1 = filtered_to_pandas_duplicates['longitude'][idx]
        lat1 = filtered_to_pandas_duplicates['latitude'][idx]

        distance_from.append(haversine(lon1, lat1, user_lng, user_lat))
        
    
    filtered_to_pandas_duplicates['distance_from'] = distance_from

    filtered_to_pandas_duplicates = filtered_to_pandas_duplicates.sort_values(by = "distance_from")
    
    return filtered_to_pandas_duplicates


In [93]:
%%time
input_user = "Lu4-NKrpJbSBpUcZPogovg"
recs = get_recs(input_user, user_lat = 30.270555, user_lng = -97.742895)

CPU times: user 265 ms, sys: 35.8 ms, total: 300 ms
Wall time: 1min 57s


Finally, let's remove any businesses our input user has already left a review for.

In [123]:
input_user_data = data.select("user_id", "business_id", "date", "categories").filter(data.user_id == input_user)

In [124]:
#remove duplicate businesses
recs = recs[recs['business_id'].isin(new_only)]

In [104]:
#how often other customers appear 
import numpy as np

u_users, u_count = np.unique(recs['new_user_id'], return_counts = True)

list(zip(u_users, u_count))

[('-127E2yrTl8VRT8Zh-IzcA', 1),
 ('-745cYuxU-Yzi5wsTTl1zg', 1),
 ('-OY5qbkkKvD1YGaN6UhzsA', 3),
 ('-Xq4VkojyDxOlGQ0Ln-IXw', 1),
 ('-_DkhGsCaWJbN3aB1GmTJg', 5),
 ('-_x2o73mzJ7BgCAydBpM9Q', 1),
 ('-e8XaCNfSHYzIy7YFATXFA', 30),
 ('-r684wP4J3WpDWBfSjwkeg', 10),
 ('-r90xzlwlB_huF8wSL3v1A', 1),
 ('-trl7htF1I1_BH7D3ZYDqg', 1),
 ('-uTleVV7SJJdmTZmQ3PEPg', 19),
 ('-vFl5LV9Aqd494V9F4FBcQ', 47),
 ('-yHEt4nmx6SEWW0bhFAtKg', 3),
 ('0-fkc1VF28tFEYq6TWR45w', 1),
 ('06YeNfSpBoAu3ZAeA7vATg', 2),
 ('07ZsxJOKSRXBZJ02kT6pEA', 1),
 ('09VGyGnKbkgnGoyWUZGOiA', 3),
 ('0NgeMvzs99zI-gkfhP3LSQ', 2),
 ('0P0ivEq2UxllE9UA5WhIug', 2),
 ('0PrV9wgHBnDwb71F-ZhMag', 20),
 ('0QHl8KVCaT1wKfS33v2dUw', 10),
 ('0a-Wbn8KmP5Vu7eRibfH5g', 3),
 ('0cir6cd1hbrelrfYzMPNSw', 21),
 ('0zCuittKrx-eImNva8udaw', 1),
 ('18KRbHUM-uCgR94of218Qg', 2),
 ('19MQQFLbpI5z6PYhHrG2mg', 2),
 ('1LpnOswhApttYj4C1Sr2Jg', 11),
 ('1O638BDK_fWuxgTVJwff-A', 94),
 ('1P8W9bHpiWdM4te1kNo66g', 3),
 ('1hW4ZhhmpnphKwfrYVnDIQ', 1),
 ('1jLGFOjk5zRu6KyMD6WCyA', 2),

## Measure Relevance of Recommendations 

To measure relevance of our recommendations, we'll look at our input users type of business visted versus the types of businesses recommended. We can measure relevance through the cosine similarity of the business categories for our input user and the recommended businesses for our user.

In [94]:
# Calculate cosine similarity between two vectors 
import numpy as np
def cossim(v1, v2):
    return np.dot(v1, v2) / np.sqrt(np.dot(v1, v1)) / np.sqrt(np.dot(v2, v2))

In [96]:
#tokenize and run our word2vec model on the input users's business categories column
input_user_categories_token = regexTokenizer.transform(input_user_data)

input_user_vectors = model.transform(input_user_categories_token)

In [97]:
#return Pandas Dataframe
input_user_vectors_pandas = input_user_vectors.toPandas()

#sort the Dataframe by date
input_user_vectors_pandas = input_user_vectors_pandas.sort_values(by = "date", ascending = False)

In [103]:
input_user_vectors_pandas.head()

Unnamed: 0,user_id,business_id,date,categories,categories_token,categories_vector
91,Lu4-NKrpJbSBpUcZPogovg,lOHzmSU3D4Pffc0lcUI3lg,2020-09-13 19:02:44,"American (Traditional), Restaurants, Barbeque,...","[american, traditional, restaurants, barbeque,...","[-0.057390776276588444, -0.32251776829361917, ..."
84,Lu4-NKrpJbSBpUcZPogovg,6Ys2XaZlp5V_TobupYSfcQ,2020-09-08 20:57:59,"Obstetricians & Gynecologists, Health & Medica...","[obstetricians, gynecologists, health, medical...","[-0.1618240050971508, 0.338081955909729, -0.04..."
88,Lu4-NKrpJbSBpUcZPogovg,MG57MDjNvQ1lJ1uxHEAKxg,2020-08-22 19:51:10,"Day Spas, Beauty & Spas, Nail Salons","[day, spas, beauty, spas, nail, salons]","[0.2256332275768121, 0.17422958835959435, 0.08..."
40,Lu4-NKrpJbSBpUcZPogovg,ih_MxWyh3OswxzG857v6TQ,2020-08-08 21:22:47,"Barbeque, Automotive, Fast Food, Restaurants, ...","[barbeque, automotive, fast, food, restaurants...","[0.12878134420939855, -0.4072647733347756, 0.2..."
44,Lu4-NKrpJbSBpUcZPogovg,a6Mp8OCNA8IubxHUxILhwg,2020-08-08 18:41:46,"Tex-Mex, Restaurants, Mexican","[tex, mex, restaurants, mexican]","[-0.12736140144988894, -0.3960172804072499, 0...."


In [101]:
recs.head()

Unnamed: 0,business_id,business_name,categories,latitude,longitude,new_user_id,input_user_rating,new_user_rating,new_user_new_business_rating,abs_of_col,categories_vector,distance_from
1051,-AMbae1vfc9DidQrI9q27w,Stars In Your Eyes Optometry,"Eyewear & Opticians, Laser Eye Surgery/Lasik, ...",30.269877,-97.742975,m2M9qvF1joFKisTeb7NRQw,3.0,3.0,5.0,0.0,"[-0.09221282415091991, 0.10586612783372402, 0....",0.047027
2751,ZJFIDLUiZ2Vs0NTj0pGVfg,Perry's Steakhouse & Grille - Downtown Austin,"Seafood, Steakhouses, Restaurants, American (N...",30.26962,-97.74341,-e8XaCNfSHYzIy7YFATXFA,5.0,5.0,5.0,0.0,"[0.141895323193499, -0.16145237694893563, 0.05...",0.071507
1178,E8aCB-WjiXlTgUJYo3OMHQ,Cuba 512,"Cuban, Caribbean, Restaurants",30.271252,-97.741946,eRPsGOd1VE7lF6pRCMVWuw,1.0,1.0,5.0,0.0,"[-0.02844127764304479, -0.05267608786622683, 0...",0.074263
995,0lgGW93HYeVqrTTx9d3gUA,Chi'Lantro,"Barbeque, Street Vendors, Food, Food Stands, K...",30.270635,-97.741542,NHuLFl_eDxtXV1RLVOV-Tg,4.0,4.0,4.0,0.0,"[-0.23380851248900095, -0.5317708517735202, 0....",0.080864
462,fTqdzwU5A7A5fODHGnBqlQ,Total Men's Primary Care - Downtown,"Beauty & Spas, Health & Medical, Medical Cente...",30.270362,-97.741458,dADe6pGW1OaYBWQWeIHbxg,5.0,5.0,5.0,0.0,"[-0.0032554444144753847, 0.3259258436806062, 0...",0.086711


Let's check that our word embeddings on the categories column is working well by finding the cosine similarity between a couple of sample vectors.

In [111]:
input_user_vectors_pandas['categories'][84]

'Obstetricians & Gynecologists, Health & Medical, Doctors'

In [117]:
recs['categories'][462]

'Beauty & Spas, Health & Medical, Medical Centers, Doctors, Family Practice, Weight Loss Centers, Laser Hair Removal, Hair Removal'

In [116]:
cossim(input_user_vectors_pandas['categories_vector'][84], recs['categories_vector'][462])

0.8179628485252962

In [119]:
input_user_vectors_pandas['categories'][91]

'American (Traditional), Restaurants, Barbeque, Steakhouses'

In [120]:
cossim(input_user_vectors_pandas['categories_vector'][91], recs['categories_vector'][462])

-0.06000090634718217

We see a high cosine similiarty between two health realted businesses and a low cosine similarity between the health related service and the BBQ restaurant. Looks like our word2vec model has done a nice job of finding the relationshiop between the businesses categoires.

To get a feel for the relevance of all of the recommendations, we'll try to putting all of the categories for our input user's rated businesses and recommended businesses into two large vectors and compare similarity.

In [125]:
#create large vector for input user and recommendations on categories
v1 = input_user_vectors_pandas.categories.str.cat(sep=' ')
v2 = recs.categories.str.cat(sep=' ')

In [127]:
#return to Pyspark dataframe to use on word2vec
v1_df = spark.createDataFrame([v1], "string").toDF("categories")
v2_df = spark.createDataFrame([v2], "string").toDF("categories")

In [128]:
#get vector representation of each
v1_word2vec = model.transform(regexTokenizer.transform(v1_df))
v2_word2vec = model.transform(regexTokenizer.transform(v2_df))
v1_word2vec = v1_word2vec.toPandas()
v2_word2vec = v2_word2vec.toPandas()

In [129]:
#compare with cosine similiarty
cossim(v1_word2vec['categories_vector'][0], v2_word2vec['categories_vector'][0])

0.8978640496752872

Overall, it appears the recommended businesses align with the input user's history in terms of similarity.

What if we know the type of business a user needs a recommendation for? We can now also sort our recommendations by similiarty to a type of business.  

As an example, let's find the most similar businesses to a users most recent review from their list of recommended businesses.

In [135]:
print("The categories of the input users most recent review: {}".format(input_user_vectors_pandas['categories'][0]))

The categories of the input users most recent review: Thai, Restaurants, Food, Food Trucks


In [130]:
track_sim = []
v2 = input_user_vectors_pandas['categories_vector'][0]
for vector in recs['categories_vector']:
    track_sim.append(cossim(v1 = vector, v2 = v2))

In [131]:
recs['similarity'] = track_sim

In [133]:
recs.sort_values(by = "similarity", ascending = False).head()

Unnamed: 0,business_id,business_name,categories,latitude,longitude,new_user_id,input_user_rating,new_user_rating,new_user_new_business_rating,abs_of_col,categories_vector,distance_from,similarity
1246,XZb-K_pP8Roz8WIG2hPFEg,Tuk Tuk Thai Cafe,"Thai, Restaurants, Food",30.215908,-97.79667,SneLjEnBq-sZNQrXc0ExlQ,5.0,5.0,5.0,0.0,"[-0.45205954710642493, -0.4202902875840664, 0....",4.952276,0.937396
2960,vuOfLg269Rr4-moMAidLqg,Veracruz All Natural,"Food, Restaurants, Food Trucks, Mexican",30.231436,-97.788077,s9jbQyCn2p_SDc7o55GXRw,4.0,4.0,5.0,0.0,"[-0.24607874490320683, -0.42526403442025185, 0...",3.815392,0.84253
2582,tHv6_4DKOV8sZnlvTrCN9Q,Al Pastor,"Food Trucks, Restaurants, Mexican, Food",30.242096,-97.729973,Zf66IDXGUlRdKS4fwzARIw,4.0,4.0,5.0,0.0,"[-0.24607874490320683, -0.42526403442025185, 0...",2.110721,0.84253
353,btqvmsmX5Phgr1A0jH6j0w,LUV Thai Cuisine,"Restaurants, Thai",30.230298,-97.815657,eRPsGOd1VE7lF6pRCMVWuw,5.0,5.0,5.0,0.0,"[-0.4505927413702011, -0.411759490147233, 0.14...",5.153585,0.842305
362,xFgIiLmJVCKqKX8Ra_ZNQQ,Chi'Lantro,"Korean, Restaurants, Food, Asian Fusion, Barbe...",30.394372,-97.720157,jA8QxEykYOAwJCLJWsV24A,5.0,5.0,5.0,0.0,"[-0.3906800467520952, -0.5517844804562628, 0.2...",8.655685,0.810751


The top 5 recommended businesses are now all Thai food or food truck related businesses.

This exercise can be easily repeated with a users past n reviews to find new businesses to recommend that are highly relevant to our user's review history.