In [0]:
%pyspark
from pyspark.sql.functions import *
from pyspark import HiveContext

hc = HiveContext(sc)
df1 = hc.table('users')
df2 = hc.table('restaurant')
df3 = hc.table('review')

In [1]:
%pyspark
keywords = [
    'American', 'Mexican', 'Italian', 'Japanese', 'Chinese', 'Thai', 'Mediterranean',
    'French', 'Vietnamese', 'Greek', 'Indian', 'Korean', 'Hawaiian', 'African', 'Spanish'
]

if 'category' not in df2.columns:
    df4 = df2.withColumn('category', lit(None))
    
for keyword in keywords:
    df4 = df4.withColumn('category', when(col('categories').contains(keyword), keyword).otherwise(col('category')))


In [2]:
%pyspark

option1 = [('Average Stars', 'Average Stars'), ('Number Of Fans', 'Number Of Fans')]
selected_options = z.select("Top Popular User", option1)

if "Average Stars" in selected_options:
    top_user1 = df1.select('user_id', 'user_name', 'user_average_stars') \
        .orderBy(col('user_average_stars').desc()) \
        .limit(100)
    z.show(top_user1)
elif "Number Of Fans" in selected_options:
    top_user2 = df1.select('user_id', 'user_name', 'user_fans') \
        .orderBy(col('user_fans').desc()) \
        .limit(100)
    z.show(top_user2)
else:
    print("Please select at least one option.")
    
top_user1.write.format("jdbc").mode("overwrite") \
  .option("url", "jdbc:mysql://node-master:3306/yelp") \
  .option("dbtable", "top_user_star") \
  .option("user", "root") \
  .option("password", "admin") \
  .save()
  
top_user2.write.format("jdbc").mode("overwrite") \
  .option("url", "jdbc:mysql://node-master:3306/yelp") \
  .option("dbtable", "top_user_fans") \
  .option("user", "root") \
  .option("password", "admin") \
  .save()

In [3]:
%pyspark

print('Number of restaurant in each state')
state = df4.select('state')\
    .groupBy(col('state').alias('state')) \
    .agg(count('state').alias('State counts')) \
    .orderBy(col('State counts').desc()) \

z.show(state)

state.write.format("jdbc").mode("overwrite") \
  .option("url", "jdbc:mysql://node-master:3306/yelp") \
  .option("dbtable", "num_res_state") \
  .option("user", "root") \
  .option("password", "admin") \
  .save()


In [4]:
%pyspark

print('Number of restaurant in each city')
city = df4.select('city')\
    .groupBy(col('city').alias('city')) \
    .agg(count('city').alias('City counts')) \
    .orderBy(col('City counts').desc()) \

z.show(city)

city.write.format("jdbc").mode("overwrite") \
  .option("url", "jdbc:mysql://node-master:3306/yelp") \
  .option("dbtable", "num_res_city") \
  .option("user", "root") \
  .option("password", "admin") \
  .save()

In [5]:
%pyspark

print('Distributions of Franchizes')
result = df4.groupBy(col('name').alias('Business Name')) \
    .agg(count('name').alias('counts')) \
    .orderBy(col('counts').desc()) \

z.show(result)

result.write.format("jdbc").mode("overwrite") \
  .option("url", "jdbc:mysql://node-master:3306/yelp") \
  .option("dbtable", "franchizes") \
  .option("user", "root") \
  .option("password", "admin") \
  .save()

In [6]:
%pyspark

option1 = [('NJ','NJ'),('AZ','AZ'),('AB','AB'),('NV','NV'),('PA','PA'),('CA','CA'),('ID','ID'),('DE','DE'),('IL','IL'),('FL','FL'),('MO','MO'),('TN','TN'),('IN','IN'),('LA','LA'),('MT','MT')]
print("Top-rating Restaurant in "+ " and ".join(z.select("State", option1)))

selected_states = [state for state, _ in option1 if state in z.select("State", option1)]
if not selected_states:
    selected_states = [state for state, _ in option1]
filtered_df = df4.filter(df4.state.isin(selected_states))

top_res = filtered_df.select('name','stars','review_count','city','state','hours','categories','attributes') \
    .orderBy(col('stars').desc()) \
    .limit(1000)
    
z.show(top_res)

top_res.write.format("jdbc").mode("overwrite") \
  .option("url", "jdbc:mysql://node-master:3306/yelp") \
  .option("dbtable", "top_res_MT") \
  .option("user", "root") \
  .option("password", "admin") \
  .save()

In [7]:
%pyspark

option2 = [('Chinese','Chinese'),('Korean','Korean'),('Japanese','Japanese'),('American','American'),('Mexican','Mexican'),('Italian','Italian'),('Indian','Indian'),('Vietnamese','Vietnamese'),('Spanish','Spanish'),('Thai','Thai'),('Greek','Greek'),('Mediterranean','Mediterranean'),('French','French'),('Hawaiian','Hawaiian'),('African','African')]
print("Top-rating Restaurant of",(z.select("Cuisine", option2)), "cuisine")

selected_cuisines = [cuisine for cuisine, _ in option2 if cuisine in z.select("Cuisine", option2)]
if not selected_cuisines:
    selected_cuisines = [cuisine for cuisine, _ in option2]

filtered_df = df4.filter(df4.category.isin(selected_cuisines))

top_cuisine = filtered_df.select('name', 'stars')\
    .orderBy(col('stars').desc()) 

z.show(top_cuisine)

top_cuisine.write.format("jdbc").mode("overwrite") \
  .option("url", "jdbc:mysql://node-master:3306/yelp") \
  .option("dbtable", "top_cuisine_african") \
  .option("user", "root") \
  .option("password", "admin") \
  .save()

In [8]:
%pyspark
df3 = hc.table('review')

selected_id = z.textbox('Please insert your business id: ')
filtered_df = df3.filter(df3.rev_business_id == selected_id)

if filtered_df.count() == 0:
    print('Invalid business id')
else:
    latest_rev = filtered_df.select('rev_user_id', 'rev_stars', 'rev_date', 'rev_text', 'rev_useful', 'rev_funny', 'rev_cool') \
        .orderBy(col('rev_date').desc())

    z.show(latest_rev)
    
latest_rev.write.format("jdbc").mode("overwrite") \
  .option("url", "jdbc:mysql://node-master:3306/yelp") \
  .option("dbtable", "latest_rev") \
  .option("user", "root") \
  .option("password", "admin") \
  .save()

In [9]:
%pyspark

elite_count = df1.filter(df1.user_elite != "").count()
regular_count = df1.filter(df1.user_elite == "").count()

EliteVsRegularCount_df = spark.createDataFrame([
    ("Elite Users", elite_count),
    ("Regular Users", regular_count),
], ["User Type", "Count"])

z.show(EliteVsRegularCount_df)

EliteVsRegularCount_df.write.format("jdbc").mode("overwrite") \
  .option("url", "jdbc:mysql://node-master:3306/yelp") \
  .option("dbtable", "elite_regular") \
  .option("user", "root") \
  .option("password", "admin") \
  .save()
