In [13]:
password = '22444662'

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import udf,col,sum
from pyspark.sql.types import StringType
from pyspark.sql.functions import desc
from pyspark.sql.functions import monotonically_increasing_id

In [2]:
spark = SparkSession.builder.appName("Apache PySpark Final Project-Zomato").\
    config('spark.jars', '/home/prasag/snap/dbeaver-ce/212/.local/share/DBeaverData/drivers/maven/maven-central/org.postgresql/postgresql-42.5.0.jar')\
    .getOrCreate()

22/11/03 18:25:02 WARN Utils: Your hostname, 1011000011101110 resolves to a loopback address: 127.0.1.1; using 192.168.1.69 instead (on interface wlp3s0)
22/11/03 18:25:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/11/03 18:25:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/03 18:25:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
zomato_df = spark.read.csv('data/cleaned_zomato.csv',inferSchema=True,header=True)

In [4]:
zomato_df.columns

['name',
 'online_order',
 'book_table',
 'rating',
 'votes',
 'phone',
 'location',
 'type',
 'dish_liked',
 'cuisines',
 'approx_cost_two_people']

# task 1: convert "NEW" in rating to "0/5"

In [15]:
def convert(strr):
    if(strr == 'NEW'):
        return '0/5'
    else:
        return strr


In [16]:
convertUDF = udf(lambda string : convert(string),StringType())

In [17]:
zomato_df = zomato_df.withColumn("rating", convertUDF(col('rating')))

In [18]:
zomato_df.toPandas().to_csv('data/cleaned_zomato_rate.csv',index= False)

                                                                                

In [19]:
zomato_df.show(5)

+--------------------+------------+----------+------+-----+--------------+------------+-------------------+--------------------+--------------------+----------------------+
|                name|online_order|book_table|rating|votes|         phone|    location|               type|          dish_liked|            cuisines|approx_cost_two_people|
+--------------------+------------+----------+------+-----+--------------+------------+-------------------+--------------------+--------------------+----------------------+
|      Spice Elephant|         Yes|        No| 4.1/5|  787|  080 41714161|Banashankari|      Casual Dining|Momos, Lunch Buff...|Chinese, North In...|                   800|
|     San Churro Cafe|         Yes|        No| 3.8/5|  918|+91 9663487993|Banashankari|Cafe, Casual Dining|Churros, Cannello...|Cafe, Mexican, It...|                   800|
|Addhuri Udupi Bho...|          No|        No| 3.7/5|   88|+91 9620009302|Banashankari|        Quick Bites|         Masala Dosa|South I

In [79]:
zomato_df.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/pyspark_zomato',\
                              driver='org.postgresql.Driver',\
                              dbtable='task1_zomato_df',\
                              user='postgres',\
                              password=password).mode('overwrite').save()

# Task 2: Suggest where one can open a new restaurant

In [21]:
city_res_count = zomato_df.groupBy('location').count()

In [22]:
city_res_count = city_res_count.sort('count')

In [23]:
city_res_count.show()

+--------------------+-----+
|            location|count|
+--------------------+-----+
|         Uttarahalli|    1|
|Rajarajeshwari Nagar|    1|
|     Kanakapura Road|    1|
|      West Bangalore|    2|
|         Magadi Road|    2|
|      Sadashiv Nagar|    3|
|        Bommanahalli|    3|
|           Jalahalli|    3|
|   Central Bangalore|    3|
|     North Bangalore|    3|
|     Rammurthy Nagar|    4|
|         City Market|    4|
|            RT Nagar|    5|
|            Majestic|    6|
|        Sanjay Nagar|    8|
|      East Bangalore|    8|
|         Vijay Nagar|    8|
|      Sahakara Nagar|    8|
|          Hosur Road|    8|
|          HBR Layout|    9|
+--------------------+-----+
only showing top 20 rows



In [24]:
min_count = city_res_count.collect()[0][1]

In [25]:
city_res_count.printSchema()

root
 |-- location: string (nullable = true)
 |-- count: long (nullable = false)



In [26]:
type(min_count)

int

In [27]:
suggested_citis = city_res_count.filter(col('count') == min_count).select(col('location'))

In [28]:
suggested_citis.show()

+--------------------+
|            location|
+--------------------+
|     Kanakapura Road|
|Rajarajeshwari Nagar|
|         Uttarahalli|
+--------------------+



In [30]:
suggested_citis.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/pyspark_zomato',\
                              driver='org.postgresql.Driver',\
                              dbtable='task2_suggested_cities',\
                              user='postgres',\
                              password=password).mode('overwrite').save()

# Task 3: Compare the restaurant to whether it has the facility of an “online order” or not.

In [34]:
having_online_order = zomato_df.filter(zomato_df.online_order == 'Yes').select(col('name'))\
                                .withColumnRenamed('name','having_online_order')\
                                .withColumn("serial_no", monotonically_increasing_id())

In [35]:
having_online_order.count()

7996

In [36]:
having_online_order.show(5)

+-------------------+---------+
|having_online_order|serial_no|
+-------------------+---------+
|     Spice Elephant|        0|
|    San Churro Cafe|        1|
|       Cafe Shuffle|        2|
|   The Coffee Shack|        3|
|    San Churro Cafe|        4|
+-------------------+---------+
only showing top 5 rows



In [37]:
not_having_online_order = zomato_df.filter(zomato_df.online_order == 'No').select(col('name'))\
                                    .withColumnRenamed('name','not_having_online_order')\
                                    .withColumn("serial_no", monotonically_increasing_id())

In [38]:
not_having_online_order.count()

3352

In [39]:
not_having_online_order.show(5)

+-----------------------+---------+
|not_having_online_order|serial_no|
+-----------------------+---------+
|   Addhuri Udupi Bho...|        0|
|             Caf-Eleven|        1|
|               T3H Cafe|        2|
|     Gustoes Beer House|        3|
|       The Biryani Cafe|        4|
+-----------------------+---------+
only showing top 5 rows



In [40]:
having_online_order.count()+not_having_online_order.count()

11348

In [41]:
zomato_df.count()

11348

In [42]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import monotonically_increasing_id

In [43]:
having_and_not_having = having_online_order.join(not_having_online_order,having_online_order.serial_no == not_having_online_order.serial_no,'left')\
                        .select(col('having_online_order'),col('not_having_online_order'))

In [44]:
having_and_not_having.count()

7996

In [45]:
having_and_not_having.show()

+--------------------+-----------------------+
| having_online_order|not_having_online_order|
+--------------------+-----------------------+
|      Spice Elephant|   Addhuri Udupi Bho...|
|     San Churro Cafe|             Caf-Eleven|
|        Cafe Shuffle|               T3H Cafe|
|    The Coffee Shack|     Gustoes Beer House|
|     San Churro Cafe|       The Biryani Cafe|
|        Woodee Pizza|           Chatar Patar|
|           Redberrys|            Mane Thindi|
|          Foodiction|                 Darbar|
|     Ovenstory Pizza|            Kollapuri's|
|              Faasos|   Harshi Super Sand...|
|   Empire Restaurant|   Sri Venkateshwara...|
|           Chaatimes|     Gustoes Beer House|
|      Kitchen Garden|             Caf-Eleven|
|           FreshMenu|               T3H Cafe|
|            Goa 0 Km|   Iyer's Tiffin Centre|
|         Kabab Magic|                 Darbar|
|       Frozen Bottle|           Taaza Thindi|
|       Meghana Foods|   Sri Laxmi Venkate...|
|          Po

In [46]:
having_and_not_having.toPandas().to_csv('data/having_not_having_online.csv',index= False)

In [47]:
having_and_not_having.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/pyspark_zomato',\
                              driver='org.postgresql.Driver',\
                              dbtable='task3_having_and_not_having',\
                              user='postgres',\
                              password=password).mode('overwrite').save()

# Task 4 : List the top ten restaurants with the highest number of branches.

In [48]:
top_ten_testutants = zomato_df.groupBy('name').count()

In [49]:
top_ten_testutants = top_ten_testutants.sort(col('count').desc())

In [50]:
top_ten_testutants = top_ten_testutants.limit(10)

In [51]:
top_ten_testutants.show()

+-----------------+-----+
|             name|count|
+-----------------+-----+
|Empire Restaurant|   66|
|              KFC|   59|
|           Faasos|   54|
|       Polar Bear|   48|
|       McDonald's|   45|
|     Krispy Kreme|   39|
|     Mudpipe Cafe|   39|
|       Chai Point|   37|
|           Onesta|   37|
| Lakeview Milkbar|   36|
+-----------------+-----+



In [52]:
top_ten_testutants.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/pyspark_zomato',\
                              driver='org.postgresql.Driver',\
                              dbtable='task4_top_ten_resturants',\
                              user='postgres',\
                              password=password).mode('overwrite').save()

In [54]:
top_ten_testutants.toPandas().to_csv('data/top_ten_testurants.csv',index= False)

## Task 5: List restaurants that are either cafes or Quick Bites 

In [55]:
cafes_or_quick_bites = zomato_df.where(col('type').contains('cafe')\
                                       | col('type').contains('Cafe')\
                                       | col('type').contains('Quick Bites')\
                                       | col('type').contains('Quick bites')\
                                       | col('type').contains('quick bites')).select(col('name'),col('type'))

In [59]:
cafes_or_quick_bites.toPandas().to_csv('data/cafe_ot_quick_bites.csv',index= False)

In [57]:
cafes_or_quick_bites.show()

+--------------------+-------------------+
|                name|               type|
+--------------------+-------------------+
|     San Churro Cafe|Cafe, Casual Dining|
|Addhuri Udupi Bho...|        Quick Bites|
|        Cafe Shuffle|               Cafe|
|    The Coffee Shack|               Cafe|
|          Caf-Eleven|               Cafe|
|     San Churro Cafe|Cafe, Casual Dining|
|            T3H Cafe|               Cafe|
|        Woodee Pizza|               Cafe|
|           Redberrys|               Cafe|
|          Foodiction|        Quick Bites|
|              Faasos|        Quick Bites|
|           Chaatimes|        Quick Bites|
|      Kitchen Garden|        Quick Bites|
|         Kabab Magic|        Quick Bites|
|    The Biryani Cafe|        Quick Bites|
|        Chatar Patar|        Quick Bites|
|         Mane Thindi|        Quick Bites|
|Bengaluru Coffee ...|        Quick Bites|
|         Roll N Rock|        Quick Bites|
|                 KFC|        Quick Bites|
+----------

In [58]:
cafes_or_quick_bites.count()

5090

In [60]:
cafes_or_quick_bites.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/pyspark_zomato',\
                              driver='org.postgresql.Driver',\
                              dbtable='task5_cafes_or_quick_bites',\
                              user='postgres',\
                              password=password).mode('overwrite').save()

# Task 6: Count the number of restaurants that allows online orders and book table.

In [61]:
res_allows_online_and_book_table = zomato_df.filter((zomato_df.book_table=='Yes') & (zomato_df.online_order=='Yes'))\
                                            .select(col('name'),col('online_order'),col('book_table'))

In [62]:
res_allows_online_and_book_table.toPandas().to_csv('data/res_allows_online_and_book_table.csv',index= False)

In [63]:
res_allows_online_and_book_table.count()

1564

In [64]:
res_allows_online_and_book_table.show()

+--------------------+------------+----------+
|                name|online_order|book_table|
+--------------------+------------+----------+
|        Cafe Shuffle|         Yes|       Yes|
|    The Coffee Shack|         Yes|       Yes|
|            Goa 0 Km|         Yes|       Yes|
|  Sri Udupi Food Hub|         Yes|       Yes|
|           Patio 805|         Yes|       Yes|
|             Grazers|         Yes|       Yes|
| Sea Spice by 7 Star|         Yes|       Yes|
|                Subz|         Yes|       Yes|
|        Cafe Shuffle|         Yes|       Yes|
|             Vinny's|         Yes|       Yes|
|        Brew Meister|         Yes|       Yes|
|    The Coffee Shack|         Yes|       Yes|
| Sea Spice by 7 Star|         Yes|       Yes|
|             Vinny's|         Yes|       Yes|
|        Cafe Shuffle|         Yes|       Yes|
|    The Coffee Shack|         Yes|       Yes|
|   Deja Vu Resto Bar|         Yes|       Yes|
|    Melt - Eden Park|         Yes|       Yes|
|        Mudp

In [226]:
res_allows_online_and_book_table.count()

1564

In [65]:
res_allows_online_and_book_table.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/pyspark_zomato',\
                              driver='org.postgresql.Driver',\
                              dbtable='task6_res_allows_online_and_book_table',\
                              user='postgres',\
                              password=password).mode('overwrite').save()

# Task 7: Get the number of restaurants in each city.

In [66]:
no_of_resturnats_by_city = zomato_df.groupby('location').count()

In [67]:
no_of_resturnats_by_city = no_of_resturnats_by_city.withColumnRenamed('count','no_of_resturants')

In [68]:
no_of_resturnats_by_city.toPandas().to_csv('data/no_of_resturnats_by_city.csv',index = False)

In [69]:
no_of_resturnats_by_city.show()

+--------------------+----------------+
|            location|no_of_resturants|
+--------------------+----------------+
|           Bellandur|             213|
|      East Bangalore|               8|
|      West Bangalore|               2|
|         Indiranagar|             617|
|                 BTM|             811|
|        Banashankari|             149|
|      Sahakara Nagar|               8|
|Koramangala 7th B...|             322|
|            JP Nagar|             437|
|        Lavelle Road|             183|
|Koramangala 3rd B...|             102|
|        Kammanahalli|             125|
|Koramangala 2nd B...|              45|
|      St. Marks Road|             113|
|            Majestic|               6|
|         Sankey Road|              10|
|ITPL Main Road, W...|              25|
|      CV Raman Nagar|              10|
|           Jayanagar|             527|
|        Brigade Road|             276|
+--------------------+----------------+
only showing top 20 rows



In [70]:
no_of_resturnats_by_city.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/pyspark_zomato',\
                              driver='org.postgresql.Driver',\
                              dbtable='task7_no_of_resturnats_by_city',\
                              user='postgres',\
                              password=password).mode('overwrite').save()

# Task 8: Find the total no. of votes in each cities using window function.

In [71]:
windowSpec = Window.partitionBy('location')

In [72]:
total_votes_by_city = zomato_df.withColumn('total_votes',sum(col('votes')).over(windowSpec))\
                                .select(col('location'),col('total_votes')).dropDuplicates()

In [73]:
total_votes_by_city.show()

+------------------+-----------+
|          location|total_votes|
+------------------+-----------+
|               BTM|     248350|
|      Banashankari|      53715|
|         Banaswadi|      12870|
| Bannerghatta Road|     119575|
|      Basavanagudi|      55228|
|Basaveshwara Nagar|       7123|
|         Bellandur|      82887|
|      Bommanahalli|        602|
|      Brigade Road|     196476|
|       Brookefield|      46651|
|    CV Raman Nagar|       1363|
| Central Bangalore|       1150|
|     Church Street|     261087|
|       City Market|        488|
| Commercial Street|      12276|
|   Cunningham Road|      39254|
|            Domlur|      10604|
|    East Bangalore|       2231|
|           Ejipura|      10400|
|   Electronic City|      38235|
+------------------+-----------+
only showing top 20 rows



In [74]:
total_votes_by_city.toPandas().to_csv('data/total_votes_by_location.csv',index = False)

In [75]:
total_votes_by_city.count()

85

In [76]:
total_votes_by_city.show()

+------------------+-----------+
|          location|total_votes|
+------------------+-----------+
|               BTM|     248350|
|      Banashankari|      53715|
|         Banaswadi|      12870|
| Bannerghatta Road|     119575|
|      Basavanagudi|      55228|
|Basaveshwara Nagar|       7123|
|         Bellandur|      82887|
|      Bommanahalli|        602|
|      Brigade Road|     196476|
|       Brookefield|      46651|
|    CV Raman Nagar|       1363|
| Central Bangalore|       1150|
|     Church Street|     261087|
|       City Market|        488|
| Commercial Street|      12276|
|   Cunningham Road|      39254|
|            Domlur|      10604|
|    East Bangalore|       2231|
|           Ejipura|      10400|
|   Electronic City|      38235|
+------------------+-----------+
only showing top 20 rows



In [77]:
total_votes_by_city.printSchema()

root
 |-- location: string (nullable = true)
 |-- total_votes: long (nullable = true)



In [78]:
total_votes_by_city.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/pyspark_zomato',\
                              driver='org.postgresql.Driver',\
                              dbtable='task8_total_votes_by_city',\
                              user='postgres',\
                              password=password).mode('overwrite').save()