## Importing Libraries

In [13]:
from pyspark.sql.functions import udf, col, split, explode, trim, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql import SparkSession

## Reading Data

In [14]:
spark = SparkSession.builder.appName("zomatoJaskeerat").getOrCreate()

df = spark.read.csv("/home/jaskeerat/zomato_etl/source/csv", header=True, inferSchema=True, sep=",", quote='"')
df.show(5)

+-------------+--------------------+------------+--------+--------------------+-----------+--------------------+---------------+--------------+--------------------+--------------------+--------+-----------------+-------------------+-----------------+--------------------+-----------+----------------+-----------+-----+
|Restaurant_ID|     Restaurant_Name|Country_Code|    City|             Address|   Locality|    Locality_Verbose|      Longitude|      Latitude|            Cuisines|Average_Cost_for_two|Currency|Has_Table_booking|Has_Online_delivery|Is_delivering_now|Switch_to_order_menu|Price_range|Aggregate_rating|Rating_text|Votes|
+-------------+--------------------+------------+--------+--------------------+-----------+--------------------+---------------+--------------+--------------------+--------------------+--------+-----------------+-------------------+-----------------+--------------------+-----------+----------------+-----------+-----+
|     16668008|       Arigato Sushi|       

In [15]:
print("Number of Rows:", df.count())

Number of Rows: 29760


## Rating Colour

In [16]:
def rating_colour(text, rating):
    if((text == "Poor") and ((rating>=1.9) and (rating<=2.4))):
        return "Red"
    if((text == "Average") and ((rating>=2.5) and (rating<=3.4))):
        return "Amber"
    if((text == "Good") and ((rating>=3.5) and (rating<=3.9))):
        return "Light Green"
    if((text == "Very Good") and ((rating>=4.0) and (rating<=4.4))):
        return "Green"
    if((text == "Excellent") and ((rating>=4.5) and (rating<=5.0))):
        return "Gold"

rating_colour_udf = udf(rating_colour, StringType())

In [17]:
df = df.withColumn("Aggregate_rating", col("Aggregate_rating").cast("double"))
df = df.withColumn("m_rating_colour", rating_colour_udf(col("Rating_text"), col("Aggregate_rating")))
df[["Rating_text", "Aggregate_rating", "m_rating_colour"]].show(2)

+-----------+----------------+---------------+
|Rating_text|Aggregate_rating|m_rating_colour|
+-----------+----------------+---------------+
|    Average|             3.3|          Amber|
|  Excellent|             4.6|           Gold|
+-----------+----------------+---------------+
only showing top 2 rows



## Cuisines

In [18]:
df.select("Cuisines").distinct().show(5)

+--------------------+
|            Cuisines|
+--------------------+
|North Indian, Mug...|
|North Indian, Str...|
|           Cafe, Tea|
|Chinese, Japanese...|
|North Indian, Con...|
+--------------------+
only showing top 5 rows



In [19]:
cuisine_df = df.withColumn("cuisine_list", split(col("Cuisines"), ","))
cuisine_df = cuisine_df.select("Cuisines", "cuisine_list")
cuisine_df = cuisine_df.withColumn("cuisine", explode(col("cuisine_list")))
cuisine_df = cuisine_df.withColumn("cuisine", trim(col("cuisine")))

cuisine_list = cuisine_df.select("cuisine").distinct().rdd.map(lambda row: row[0]).collect()
cuisine_list

['International',
 'Salad',
 'Raw Meats',
 'Mexican',
 'Kerala',
 'Mangalorean',
 'Bakery',
 'BBQ',
 'European',
 'Tea',
 'Turkish',
 'Tibetan',
 'Armenian',
 'Afghani',
 'Charcoal Grill',
 'Thai',
 'Fusion',
 'Biryani',
 '0',
 'Sushi',
 'Indian',
 'Chinese',
 'Continental',
 'Burger',
 'Assamese',
 'African',
 'Mughlai',
 'Sri Lankan',
 'Coffee and Tea',
 'Gujarati',
 'British',
 'Hyderabadi',
 'Sandwich',
 'Japanese',
 'Australian',
 'North Eastern',
 'Fast Food',
 'Spanish',
 'Vietnamese',
 'Kashmiri',
 'Drinks Only',
 'Pizza',
 'Burmese',
 'Portuguese',
 'Breakfast',
 'Goan',
 'Italian',
 'Lebanese',
 'Parsi',
 'Beverages',
 'Maharashtrian',
 'Modern Indian',
 'Andhra',
 'Awadhi',
 'Bengali',
 'Juices',
 'North Indian',
 'Korean',
 'Modern Australian',
 'Lucknowi',
 'Cuisine Varies',
 'French',
 'Malaysian',
 'Desserts',
 'Southwestern',
 'Arabian',
 'Healthy Food',
 'Naga',
 'Middle Eastern',
 '600',
 'Seafood',
 'Ice Cream',
 'Deli',
 'Finger Food',
 'Greek',
 'South Indian',
 'A

In [20]:
indian_cuisines = ["Kerala", "Mangalorean", "Biryani", "Indian", "Assamese", "Mughlai", "Gujarati", 
                   "Hyderabadi", "North Eastern", "Kashmiri", "Goan", "Parsi", "Maharashtrian", 
                   "Modern Indian", "Andhra", "Awadhi", "Bengali", "North Indian", "Lucknowi",
                   "Naga", "South Indian", "Chettinad", "Bihari", "Rajasthani", "Oriya", "Vegetarian"]

def cuisines_function(Cuisines):
    if any(cuisine in Cuisines for cuisine in indian_cuisines):
        return "Indian"
    else:
        return "World Cuisines"

cuisines_udf = udf(cuisines_function, StringType())
df = df.withColumn("m_cuisines", cuisines_udf(col("Cuisines")))

In [21]:
df.filter(df["m_cuisines"] == "Indian")[["Cuisines", "m_cuisines"]].show(5)

+--------------------+----------+
|            Cuisines|m_cuisines|
+--------------------+----------+
|Italian, North In...|    Indian|
|North Indian, Mug...|    Indian|
|North Indian, Mug...|    Indian|
|North Indian, Med...|    Indian|
|Italian, Mexican,...|    Indian|
+--------------------+----------+
only showing top 5 rows



In [22]:
df.filter(df["m_cuisines"] == "World Cuisines")[["Cuisines", "m_cuisines"]].show(5)

+--------------------+--------------+
|            Cuisines|    m_cuisines|
+--------------------+--------------+
|               Asian|World Cuisines|
|Cafe, Italian, Co...|World Cuisines|
|Coffee and Tea, M...|World Cuisines|
|               Asian|World Cuisines|
|Cafe, American, I...|World Cuisines|
+--------------------+--------------+
only showing top 5 rows



## Filling Nulls

In [23]:
df = df.filter((col("Cuisines").isNotNull()) & (col("Cuisines") != ""))
df.count()

29563

In [24]:
for column in df.columns:
    df = df.withColumn(column, when((col(column).isNull()) | (col(column) == ""), "NA").otherwise(col(column)))