In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window


spark = SparkSession.builder.appName("Top 5 Merchants in Each City").enableHiveSupport().getOrCreate()

review_df = spark.read.json('D:/china/intershipBigData/yelp/yelp_academic_dataset_review.json')
checkin_df = spark.read.json('D:/china/intershipBigData/yelp/yelp_academic_dataset_checkin.json')
business_df = spark.read.json('D:/china/intershipBigData/yelp/yelp_academic_dataset_business.json')
rating_freq = review_df.groupBy("business_id").agg(count("*").alias("rating_freq"))
rating_avg = review_df.groupBy("business_id").agg(avg("stars").alias("rating_avg"))
checkin_frequency = checkin_df.groupBy("business_id").agg(count("*").alias("checkin_frequency"))
data = rating_freq.join(rating_avg, "business_id") \
    .join(checkin_frequency, rating_freq["business_id"] == checkin_frequency["business_id"], "inner") \
    .join(business_df.select("business_id", "name", "city"), rating_freq["business_id"] == business_df["business_id"], "inner") \
    .drop(checkin_frequency["business_id"]) \
    .drop(business_df["business_id"])
window_spec = Window.partitionBy("city").orderBy(desc("rating_freq"), desc("rating_avg"), desc("checkin_frequency"))
ranked_merchants = data.withColumn("rank_within_city", rank().over(window_spec))
top_merch = ranked_merchants.filter(col("rank_within_city") <= 5)
top_merch.show()


+--------------------+-----------+------------------+-----------------+--------------------+--------------------+----------------+
|         business_id|rating_freq|        rating_avg|checkin_frequency|                name|                city|rank_within_city|
+--------------------+-----------+------------------+-----------------+--------------------+--------------------+----------------+
|VB5LN92Hfk4A34YtJ...|          6| 4.333333333333333|                1|  Ricci Hair Company|         AB Edmonton|               1|
|X-kWHq70nkI8jN6-w...|         10|               3.0|                1|    Prive Salon - PA|              AMBLER|               1|
|TJcRyJF-DYbXuABEe...|          8|               3.0|                1|     NAPA Auto Parts|             ARDMORE|               1|
|jPFFeyh7RmmxzQmYg...|         24|3.4166666666666665|                1|Jack's Donuts - Avon|                AVON|               1|
|JmzNw0WCPmZPZdq5n...|        394| 2.763959390862944|                1|         Kit