#### Yelp Recommender Dataset Creation 

In [1]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder.appName('Yelp Restaurant Recommender').getOrCreate()
sc = spark.sparkContext 

In [2]:
# read in yelp datasets 
dfBusiness = spark.read.option('multiline','true').option("quote", "\"").option('escape','\"').option('ignoreLeadingWhiteSpace', 'true').option('header', True).option('escapeQuotes', 'true').parquet('gs://msca-bdp-student-gcs/GroupProject_Gr7/yelp_dataset/engineered_data/business.snappy.parquet')
dfReview = spark.read.option('multiline','true').option("quote", "\"").option('escape','\"').option('ignoreLeadingWhiteSpace', 'true').option('header', True).option('escapeQuotes', 'true').parquet('gs://msca-bdp-student-gcs/GroupProject_Gr7/yelp_dataset/engineered_data/review.snappy.parquet')
dfUser = spark.read.option('multiline','true').option("quote", "\"").option('escape','\"').option('ignoreLeadingWhiteSpace', 'true').option('header', True).option('escapeQuotes', 'true').parquet('gs://msca-bdp-student-gcs/GroupProject_Gr7/yelp_dataset/engineered_data/user.snappy.parquet') 

# read in Daisuke's sentiment analysis model output 
dfSentiment = spark.read.option('multiline','true').option("quote", "\"").option('escape','\"').option('ignoreLeadingWhiteSpace', 'true').option('header', True).option('escapeQuotes', 'true').parquet('gs://msca-bdp-student-gcs/GroupProject_Gr7/yelp_dataset/engineered_data/aspect_sentiment.snappy.parquet')


                                                                                

In [3]:
from pyspark.sql import functions as F 

# create partition function from professor 
def displayPartitions(df): 
    num = df.rdd.getNumPartitions() 
    df.withColumn('partitionId', F.spark_partition_id()).groupby('partitionId').count().orderBy(F.asc('count')).show(num)  

In [4]:
from pyspark.sql.functions import col, desc, when, regexp_replace, regexp_extract, explode 
from pyspark.sql.types import StringType, MapType, IntegerType 
from ast import literal_eval 

# clean business data (cont.) 

# drop any unnecessary fields 
dfBusiness = dfBusiness.drop('AcceptsInsurance', 'HairSpecializesIn')

# for each newly created column show the top distinct values and make necessary adjustments 
# for colName in dfBusiness.columns[12:]: 
    # dfBusiness.groupby(colName).count().withColumnRenamed('count', 'businesses').sort(col('businesses').desc()).show(25) 


# convert different string types to same value     
dfBusiness = dfBusiness.withColumn('AgesAllowed', F.when(col('AgesAllowed') == u'allages', 'allages').when(col('AgesAllowed') == u'21plus', '21plus').otherwise(col('AgesAllowed'))) 
dfBusiness = dfBusiness.withColumn('Alcohol', F.when(col('Alcohol') == u'none', 'none').when(col('Alcohol') == u'full_bar', 'full_bar').when(col('Alcohol') == u'beer_and_wine', 'beer_and_wine').otherwise(col('Alcohol'))) 
dfBusiness = dfBusiness.withColumn('BYOBCorkage', F.when(col('BYOBCorkage') == u'no', 'no').when(col('BYOBCorkage') == u'yes', 'yes').when(col('BYOBCorkage') == u'yes_corkage', 'yes_corkage').otherwise(col('BYOBCorkage'))) 
dfBusiness = dfBusiness.withColumn('NoiseLevel', F.when(col('NoiseLevel') == u'average', 'average').when(col('NoiseLevel') == u'quiet', 'quiet').when(col('NoiseLevel') == u'loud', 'loud').when(col('NoiseLevel') == u'very_loud', 'very_loud').otherwise(col('NoiseLevel'))) 
dfBusiness = dfBusiness.withColumn('RestaurantsAttire', F.when(col('RestaurantsAttire') == u'casual', 'casual').when(col('RestaurantsAttire') == u'dressy', 'dressy').when(col('RestaurantsAttire') == u'formal', 'formal').otherwise(col('RestaurantsAttire'))) 
dfBusiness = dfBusiness.withColumn('Smoking', F.when(col('Smoking') == u'no', 'no').when(col('Smoking') == u'outdoor', 'outdoor').when(col('Smoking') == u'yes', 'yes').otherwise(col('Smoking'))) 
dfBusiness = dfBusiness.withColumn('WiFi', F.when(col('WiFi') == u'no', 'no').when(col('WiFi') == u'free', 'free').when(col('WiFi') == u'paid', 'paid').otherwise(col('WiFi'))) 

# convert any None or null to unknown 
dfBusiness = dfBusiness.replace('None', None) 
dfBusiness = dfBusiness.na.fill('unknown') 

# unnest further nested columns 
dfBusinessNestedCols = ['Ambience', 'BestNights', 'BusinessParking', 'DietaryRestrictions', 'GoodForMeal', 'Music'] 
for column in dfBusinessNestedCols: 
    dfBusiness = dfBusiness.withColumn('clean{}'.format(column), regexp_replace(column, "u'", "'")) 
    dfBusiness = dfBusiness.withColumn('clean{}'.format(column), regexp_replace(column, 'unknown', "{'unknown': True}")) 
    dfBusiness = dfBusiness.withColumn(column, F.udf(literal_eval, 'map<string,string>')('clean{}'.format(column))) 
    dfBusiness = dfBusiness.drop('clean{}'.format(column)) 
    curKeys = (dfBusiness.select(explode(column)).select('key').distinct().rdd.flatMap(lambda x: x).collect()) 
    curExpressions = [col(column).getItem(k).alias('{}_{}'.format(column, k)) for k in curKeys] 
    dfBusiness = dfBusiness.select('*', *curExpressions).drop(column) 

# create open/close for each time 
dfBusinessHoursCols = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] 
for column in dfBusinessHoursCols: 
    dfBusiness = dfBusiness.select('*', F.when(col(column) != 'unknown', regexp_extract(column, r'.*?(?=\:)', 0)).otherwise(None).alias('{}_start_str'.format(column)))
    dfBusiness = dfBusiness.select('*', F.when(col(column) != 'unknown', regexp_extract(column, r'(?<=\-).+(?=\:)', 0)).otherwise(None).alias('{}_end_str'.format(column)))
    dfBusiness = dfBusiness.withColumn('{}_start'.format(column), col('{}_start_str'.format(column)).cast(IntegerType())) 
    dfBusiness = dfBusiness.withColumn('{}_end'.format(column), col('{}_end_str'.format(column)).cast(IntegerType()))
    dfBusiness = dfBusiness.drop(column, '{}_start_str'.format(column), '{}_end_str'.format(column)) 

dfBusiness.printSchema() 

                                                                                

root
 |-- address: string (nullable = false)
 |-- business_id: string (nullable = false)
 |-- categories: string (nullable = false)
 |-- city: string (nullable = false)
 |-- is_open: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = false)
 |-- postal_code: string (nullable = false)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = false)
 |-- AgesAllowed: string (nullable = false)
 |-- Alcohol: string (nullable = false)
 |-- BYOB: string (nullable = false)
 |-- BYOBCorkage: string (nullable = false)
 |-- BikeParking: string (nullable = false)
 |-- BusinessAcceptsBitcoin: string (nullable = false)
 |-- BusinessAcceptsCreditCards: string (nullable = false)
 |-- ByAppointmentOnly: string (nullable = false)
 |-- Caters: string (nullable = false)
 |-- CoatCheck: string (nullable = false)
 |-- Corkage: string (nullable = false)
 |-- DogsAllowed: str

In [5]:
# clean reviews data 

from pyspark.sql.functions import col, to_date, to_timestamp 
from pyspark.sql.functions import year, month, dayofweek, hour 
from pyspark.sql.functions import length 

# convert date field to actual date 
dfReview = dfReview.withColumn('datetime', to_timestamp(col('date'), 'yyyy-MM-dd HH:mm:ss')) 
dfReview = dfReview.withColumn('date', to_date(col('datetime'))) 

# create date-related fields 
dfReview = dfReview.withColumn('year', year(col('date'))).withColumn('month', month(col('date'))) 
dfReview = dfReview.withColumn('day', dayofweek(col('date'))) 
dfReview = dfReview.withColumn('hour', hour(col('datetime'))) 

# create length of review field 
dfReview = dfReview.withColumn('text_length', length(col('text'))) 

dfReview.printSchema() 

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: date (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- text_length: integer (nullable = true)



In [6]:
from pyspark.sql.functions import size, split 

# clean user data 

dfUser = dfUser.withColumn('yelping_since', to_timestamp(col('yelping_since'), 'yyyy-MM-dd HH:mm:ss')) 
dfUser = dfUser.replace('None', None) 
dfUser = dfUser.na.fill('unknown') 
dfUser = dfUser.withColumn('n_friends', F.when(col('friends') == 'unknown', 0).otherwise(size(split(col('friends'), r'\,'))))
dfUser = dfUser.drop('name', 'friends') 

dfUser.show(5) 

[Stage 22:>                                                         (0 + 1) / 1]

+--------------------+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+----+-----+----+-----+------------+------+-------------------+---------+
|             user_id|average_stars|compliment_cool|compliment_cute|compliment_funny|compliment_hot|compliment_list|compliment_more|compliment_note|compliment_photos|compliment_plain|compliment_profile|compliment_writer|cool|elite|fans|funny|review_count|useful|      yelping_since|n_friends|
+--------------------+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+----+-----+----+-----+------------+------+-------------------+---------+
|--Kwhcbkh7jxkhVVQ...|         3.62|              1|              0|               1|             0|              0|     

                                                                                

In [7]:
# clean sentiment data 

# confirm counts are similar between sentiment and review 
dfSentiment.count() # 3773770
dfReview.count() # 3773770 
# good! 

dfSentiment.show(5, truncate = False) # already good thanks to Daisuke! 



+----------------------+-----------+-------------+----------+----------------+-------------+--------------------+---------------+---------------------+--------------------+--------------------+--------------------+------------+
|review_id             |Price_total|Opinion_total|Food_total|Atmosphere_total|Service_total|Total_sentscore     |Price_sentscore|Opinion_sentscore    |Food_sentscore      |Atmosphere_sentscore|Service_sentscore   |Sentence_num|
+----------------------+-----------+-------------+----------+----------------+-------------+--------------------+---------------+---------------------+--------------------+--------------------+--------------------+------------+
|--1z0MjmPFmNQLpnwwhBOA|1          |3            |12        |1               |3            |0.058823529411764705|0.0            |-0.058823529411764705|0.058823529411764705|0.0                 |0.058823529411764705|17          |
|--6RL5X9PiswVnpKKHDB9g|0          |2            |4         |1               |0         

                                                                                

In [8]:
# repartition if necessary 

print('Default parallelism:', sc.defaultParallelism, '\n')

print('Number of dfBusiness partitions:', dfBusiness.rdd.getNumPartitions()) 
print('Number of dfReview partitions:', dfReview.rdd.getNumPartitions()) 
print('Number of dfSentiment partitions:', dfSentiment.rdd.getNumPartitions()) 
print('Number of dfUser partitions:', dfUser.rdd.getNumPartitions()) 

# good for now, but will want to repartition the final table 

Default parallelism: 4 

Number of dfBusiness partitions: 1
Number of dfReview partitions: 12
Number of dfSentiment partitions: 4
Number of dfUser partitions: 14


In [9]:
import numpy as np 

# rename the columns in each table and join together 

dfBusinessColumns = [str(np.where(col[-3:] != '_id', 'b_{}'.format(col), col)) for col in dfBusiness.columns] 
dfBusiness = dfBusiness.toDF(*dfBusinessColumns) 
dfReviewColumns = [str(np.where(col[-3:] != '_id', 'r_{}'.format(col), col)) for col in dfReview.columns]
dfReview = dfReview.toDF(*dfReviewColumns) 
dfSentimentColumns = [str(np.where(col[-3:] != '_id', 's_{}'.format(col), col)) for col in dfSentiment.columns]
dfSentiment = dfSentiment.toDF(*dfSentimentColumns) 
dfUserColumns = [str(np.where(col[-3:] != '_id', 'u_{}'.format(col), col)) for col in dfUser.columns]
dfUser = dfUser.toDF(*dfUserColumns) 

dfFinal = dfReview.join(dfBusiness, 'business_id', 'left').join(dfSentiment, 'review_id', 'left').join(dfUser, 'user_id', 'left') 

In [10]:
dfFinal.printSchema() 

root
 |-- user_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- r_cool: long (nullable = true)
 |-- r_date: date (nullable = true)
 |-- r_funny: long (nullable = true)
 |-- r_stars: double (nullable = true)
 |-- r_text: string (nullable = true)
 |-- r_useful: long (nullable = true)
 |-- r_datetime: timestamp (nullable = true)
 |-- r_year: integer (nullable = true)
 |-- r_month: integer (nullable = true)
 |-- r_day: integer (nullable = true)
 |-- r_hour: integer (nullable = true)
 |-- r_text_length: integer (nullable = true)
 |-- b_address: string (nullable = true)
 |-- b_categories: string (nullable = true)
 |-- b_city: string (nullable = true)
 |-- b_is_open: long (nullable = true)
 |-- b_latitude: double (nullable = true)
 |-- b_longitude: double (nullable = true)
 |-- b_name: string (nullable = true)
 |-- b_postal_code: string (nullable = true)
 |-- b_review_count: long (nullable = true)
 |-- b_stars: double (nulla

In [11]:
# repartition the dataset 

print('Default parallelism:', sc.defaultParallelism, '\n')
print('Number of dfFinal partitions:', dfFinal.rdd.getNumPartitions(), '\n') 

dfFinal = dfFinal.repartition(200)
displayPartitions(dfFinal) 

Default parallelism: 4 





Number of dfFinal partitions: 87 





+-----------+-----+
|partitionId|count|
+-----------+-----+
|         41|18866|
|         51|18866|
|         50|18866|
|         45|18866|
|         53|18866|
|         57|18866|
|         60|18866|
|         32|18866|
|         90|18866|
|         62|18866|
|         61|18866|
|         34|18866|
|         88|18866|
|         33|18866|
|         48|18866|
|         56|18866|
|         52|18866|
|         42|18866|
|         58|18866|
|         92|18866|
|         47|18866|
|         35|18866|
|         46|18866|
|         91|18866|
|         43|18866|
|         49|18866|
|         44|18866|
|         59|18866|
|         54|18866|
|         55|18866|
|         36|18866|
|         89|18866|
|         64|18867|
|         86|18867|
|         95|18867|
|         30|18867|
|         93|18867|
|        100|18867|
|         81|18867|
|         97|18867|
|         77|18867|
|         87|18867|
|         76|18867|
|         69|18867|
|         38|18867|
|         82|18867|
|         63|18867|


                                                                                

In [12]:
# consider creating additional features at a later point 

In [12]:
dfFinal.coalesce(1).write.save("gs://msca-bdp-student-gcs/GroupProject_Gr7/yelp_dataset/engineered_data/combined", format = "parquet")

23/05/17 18:34:16 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                