In [0]:
surrogate_key_column = 'Restaurant_seq_ID'
delata_table_name = 'zomato_db.zomato_restaurants_delta'

In [0]:
%run "/Users/nadarpravin001@gmail.com/common_utils/common_utils"

Reading source file

In [0]:
zomato_df = spark.read.format("json")\
                 .option("inferschema","false")\
                 .option("multiline","true")\
                 .load("dbfs:/FileStore/tables/zomato_JSON/*.json")

In [0]:
country_df = (
    spark.read.format("csv")
         .option("inferschema","true")
         .option("header","true")
         .load("/FileStore/tables/zomato_JSON/Country_Code.csv")
         .withColumnRenamed("Country Code", "Country_code")
)

flatten nested JSON

In [0]:
zomato_df = flatten_check(zomato_df)

Transformation the DataFrame

In [0]:
zomato_df_selected_raw = zomato_df.selectExpr(
    "CAST(restaurants_restaurant_id AS bigint) AS restaurant_ID",
    "restaurants_restaurant_name AS restaurant_name",
    "CAST(restaurants_restaurant_location_country_id AS INT) AS country_code",
    "restaurants_restaurant_location_city AS city",
    "restaurants_restaurant_location_address AS Address",
    "restaurants_restaurant_location_locality AS locality",
    "restaurants_restaurant_location_locality_verbose AS locality_verbose",
    "CAST(restaurants_restaurant_location_latitude AS decimal(20,6)) AS restaurant_latitude",
    "CAST(restaurants_restaurant_location_longitude AS decimal(20,6)) AS restaurant_longitude",
    "restaurants_restaurant_cuisines AS cuisines",
    "CAST(restaurants_restaurant_average_cost_for_two  AS INT) AS average_cost_for_two",
    "restaurants_restaurant_currency AS currency",
    "CAST(restaurants_restaurant_has_table_booking AS INT) AS has_table_booking",
    "CAST(restaurants_restaurant_has_online_delivery AS INT) AS has_online_delivery",
    "CAST(restaurants_restaurant_is_delivering_now  AS INT) AS is_delivering_now",
    "CAST(restaurants_restaurant_switch_to_order_menu AS INT) AS switch_to_order_menu",
    "CAST(restaurants_restaurant_price_range AS int) AS price_range",
    "CAST(restaurants_restaurant_user_rating_aggregate_rating AS decimal(2,1)) AS aggregate_rating",
    "restaurants_restaurant_user_rating_rating_color AS rating_color",
    "restaurants_restaurant_user_rating_rating_text AS rating_text",
    "CAST(restaurants_restaurant_user_rating_votes AS bigint) AS rating_votes"
)

Filter records where restaurant ID is NULL

In [0]:
zomato_df_selected_filter = zomato_df_selected_raw.filter((col("restaurant_ID").isNotNull()))

Filter records if we have duplicicate  restaurant ID 

In [0]:
zomato_df_selected_rowNumber = (
    zomato_df_selected_filter.withColumn("row_number", row_number().over(Window.partitionBy(col("restaurant_ID")).orderBy(col ("country_code")))).filter(col("row_number") == 1).drop(col("row_number"))
)

In [0]:
zomato_df_selected = (
    zomato_df_selected_rowNumber.withColumn("has_table_booking", when((col("has_table_booking") == 1), 'Y')
                                                       .when((col("has_table_booking") == 0), 'N')
                                                       .otherwise('null'))
                      .withColumn("has_online_delivery", when((col("has_online_delivery") == 1), 'Y')
                                                       .when((col("has_online_delivery") == 0), 'N')
                                                       .otherwise('null'))
                      .withColumn("is_delivering_now", when((col("is_delivering_now") == 1), 'Y')
                                                       .when((col("is_delivering_now") == 0), 'N')
                                                       .otherwise('null'))
                      .withColumn("switch_to_order_menu", when((col("switch_to_order_menu") == 1), 'Y')
                                                       .when((col("switch_to_order_menu") == 0), 'N')
                                                       .otherwise('null'))
)

In [0]:
zomato_df_selected_final = (
    zomato_df_selected.join(broadcast(country_df), zomato_df_selected["country_code"] == country_df["Country_code"], "left")
    .select(zomato_df_selected["*"],country_df["Country"])

)

In [0]:
zomato_df_selected_final = (
    zomato_df_selected_final.withColumn("Country", when((col("country_code").isNotNull() & col("Country").isNull()),"Dummy")
                            .otherwise(col("Country")))
)

In [0]:
# zomato_df_selected_final.rdd.getNumPartitions()

Caching dataFrame

In [0]:
zomato_df_selected_final.cache()

Out[127]: DataFrame[restaurant_ID: bigint, restaurant_name: string, country_code: int, city: string, Address: string, locality: string, locality_verbose: string, restaurant_latitude: decimal(20,6), restaurant_longitude: decimal(20,6), cuisines: string, average_cost_for_two: int, currency: string, has_table_booking: string, has_online_delivery: string, is_delivering_now: string, switch_to_order_menu: string, price_range: int, aggregate_rating: decimal(2,1), rating_color: string, rating_text: string, rating_votes: bigint, Country: string]

In [0]:
zomato_df_selected_final.createOrReplaceTempView("zomato_df")

In [0]:
zomato_df_selected_final.count()

Out[129]: 9157

In [0]:
%sql
create table if not exists zomato_db.zomato_restaurants_delta(
  Restaurant_seq_ID bigint NOT NULL,
  restaurant_ID bigint NOT NULL,
  restaurant_name string,
  country_code string,
  Country string,
  city string,
  Address string,
  locality string,
  locality_verbose string,
  restaurant_latitude string,
  restaurant_longitude string,
  cuisines string,
  average_cost_for_two string,
  currency string,
  has_table_booking string,
  has_online_delivery string,
  is_delivering_now string,
  switch_to_order_menu string,
  price_range string,
  aggregate_rating string,
  rating_color string,
  rating_text string,
  rating_votes string
) using parquet

In [0]:
# Fetching maximum records of the table
max_df = fetch_max_count(surrogate_key_column, delata_table_name)

In [0]:
# MaxID = max_df.collect()[0][0]

Insert new records and adding surrogate key

In [0]:
%sql
INSERT INTO zomato_db.zomato_restaurants_delta
SELECT
    row_number() OVER(ORDER BY source.restaurant_ID) + MaxID AS Restaurant_seq_ID,
    source.restaurant_ID AS restaurant_ID,
    source.restaurant_name AS restaurant_name,
    source.country_code AS country_code,
    source.Country AS Country,
    source.city AS city,
    source.Address AS Address,
    source.locality AS locality,
    source.locality_verbose AS locality_verbose,
    source.restaurant_latitude AS restaurant_latitude,
    source.restaurant_longitude AS restaurant_longitude,
    source.cuisines AS cuisines,
    source.average_cost_for_two AS average_cost_for_two,
    source.currency AS currency,
    source.has_table_booking AS has_table_booking,
    source.has_online_delivery AS has_online_delivery,
    source.is_delivering_now AS is_delivering_now,
    source.switch_to_order_menu AS switch_to_order_menu,
    source.price_range AS price_range,
    source.aggregate_rating AS aggregate_rating,
    source.rating_color AS rating_color,
    source.rating_text AS rating_text,
    source.rating_votes AS rating_votes
FROM zomato_df source
LEFT JOIN zomato_db.zomato_restaurants_delta target ON source.restaurant_ID = target.restaurant_ID
CROSS JOIN MaxID
WHERE target.restaurant_ID IS NULL

In [0]:
zomato_df_selected_final.unpersist()

Out[134]: DataFrame[restaurant_ID: bigint, restaurant_name: string, country_code: int, city: string, Address: string, locality: string, locality_verbose: string, restaurant_latitude: decimal(20,6), restaurant_longitude: decimal(20,6), cuisines: string, average_cost_for_two: int, currency: string, has_table_booking: string, has_online_delivery: string, is_delivering_now: string, switch_to_order_menu: string, price_range: int, aggregate_rating: decimal(2,1), rating_color: string, rating_text: string, rating_votes: bigint, Country: string]

In [0]:
df_zomato_restaurants_delta = spark.sql(""" SELECT * FROM zomato_db.zomato_restaurants_delta""")

1. What are the Top10 Indian Cities with Maximum Restaurants?

In [0]:
data_df = (
  df_zomato_restaurants_delta.filter(col("Country") == "India")
                          .groupBy("city")
                          .agg(countDistinct("restaurant_ID").alias("number_of_cities"))
)

data_df_final = (
  data_df.withColumn("Rank", dense_rank().over(Window.orderBy(col("number_of_cities").desc())))
         .filter((col("Rank") <= 10))
)

In [0]:
display(data_df_final)

city,number_of_cities,Rank
New Delhi,5473,1
Gurgaon,1118,2
Noida,1080,3
Faridabad,251,4
Ghaziabad,25,5
Lucknow,21,6
Ahmedabad,21,6
Amritsar,21,6
Bhubaneshwar,21,6
Guwahati,21,6


2. What are the some Cities around the globe with Maximum Restaurants?

In [0]:
data_df_2 = (
  df_zomato_restaurants_delta.groupBy("city")
                          .agg(countDistinct("restaurant_ID").alias("number_of_cities"))
)

data_df_final_2 = (
  data_df_2.withColumn("Rank", dense_rank().over(Window.orderBy(col("number_of_cities").desc())))
         .filter((col("Rank") <= 5))
)

In [0]:
display(data_df_final_2)

city,number_of_cities,Rank
New Delhi,5473,1
Gurgaon,1118,2
Noida,1080,3
Faridabad,251,4
Ghaziabad,25,5


3. What are the Top10 Popular Ratings as per the Counts?

In [0]:
data_df_3 = (
  df_zomato_restaurants_delta.groupBy("aggregate_rating")
                          .agg(countDistinct("restaurant_ID").alias("number_of_cities"))
)

data_df_final_3 = (
  data_df_3.withColumn("Rank", dense_rank().over(Window.orderBy(col("number_of_cities").desc())))
         .filter((col("Rank") <= 10))
)

In [0]:
display(data_df_final_3)

aggregate_rating,number_of_cities,Rank
0.0,2165,1
3.2,520,2
3.1,518,3
3.4,488,4
3.3,477,5
3.0,468,6
3.5,461,7
3.6,427,8
3.7,388,9
2.9,381,10


4. What are the Top20 Cuisines?

In [0]:
cuisines_df = df_zomato_restaurants_delta.select(upper("cuisines").alias("cuisines"))

In [0]:
cuisines = cuisines_df.withColumn("list_of_cuisines", split(col("cuisines"), ","))

In [0]:
display(cuisines)

cuisines,list_of_cuisines
"CHINESE, INDIAN","List(CHINESE, INDIAN)"
"NORTH INDIAN, BIRYANI","List(NORTH INDIAN, BIRYANI)"
"NORTH INDIAN, CHINESE, CAFE","List(NORTH INDIAN, CHINESE, CAFE)"
"NORTH INDIAN, CHINESE, MUGHLAI","List(NORTH INDIAN, CHINESE, MUGHLAI)"
"CHINESE, THAI","List(CHINESE, THAI)"
"NORTH INDIAN, MUGHLAI","List(NORTH INDIAN, MUGHLAI)"
ITALIAN,List(ITALIAN)
"CHINESE, SEAFOOD, THAI","List(CHINESE, SEAFOOD, THAI)"
"NORTH INDIAN, CONTINENTAL, EUROPEAN","List(NORTH INDIAN, CONTINENTAL, EUROPEAN)"
CHINESE,List(CHINESE)


In [0]:
cuisines_list = cuisines.select("list_of_cuisines").rdd.flatMap(lambda x: x).collect()

In [0]:
# whole_list = cuisines_df.select(collect_list("cuisines")).collect()[0][0]

In [0]:
dict_cuisines = {}
for x in cuisines_list:
    for y in x:
        if y in dict_cuisines:
            dict_cuisines[y] += 1
        else:
            dict_cuisines[y] = 1

In [0]:
import pandas as pd

In [0]:
keys = list(dict_cuisines.keys())
values = list(dict_cuisines.values())

# Create DataFrame
Cuisines_df_pd = pd.DataFrame({'Cuisines': keys, 'Count': values})

In [0]:
display(Cuisines_df_pd)

Cuisines,Count
CHINESE,842
INDIAN,21
NORTH INDIAN,2995
BIRYANI,66
CHINESE,1875
CAFE,86
MUGHLAI,780
THAI,192
ITALIAN,210
SEAFOOD,82


In [0]:
cuisines_df_new = spark.createDataFrame(Cuisines_df_pd)

In [0]:
display(cuisines_df_new)

Cuisines,Count
CHINESE,842
INDIAN,21
NORTH INDIAN,2995
BIRYANI,66
CHINESE,1875
CAFE,86
MUGHLAI,780
THAI,192
ITALIAN,210
SEAFOOD,82


In [0]:
cuisines_top_20 = (
    cuisines_df_new.groupBy("Cuisines")
                   .agg(sum(col("Count")).alias("Count"))
                   .withColumn("Rank", dense_rank().over(Window.orderBy(col("Count").desc())))
                   .filter((col("Rank") <= 20))
                   
)

In [0]:
display(cuisines_top_20)

Cuisines,Count,Rank
NORTH INDIAN,2995,1
CHINESE,1875,2
FAST FOOD,1314,3
NORTH INDIAN,973,4
CHINESE,842,5
MUGHLAI,780,6
FAST FOOD,671,7
BAKERY,621,8
CAFE,618,9
ITALIAN,521,10
