In [1]:

## Importing Libraries and creating session

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession as ss
from pyspark.sql.functions import regexp_extract, regexp_replace,avg, udf
from pyspark.sql.functions import round as Round
from pyspark.sql.functions import col, when
from pyspark.sql.functions import to_date
from pyspark.sql.functions import lower
from pyspark.sql.types import IntegerType
import re, glob
from statistics import mean

spark = ss.builder.appName('ADDO-EXAM').getOrCreate()
sc= spark.sparkContext


In [3]:
df = spark.read.json('input/recipes-000.json')

In [4]:
df.printSchema()


root
 |-- cookTime: string (nullable = true)
 |-- datePublished: string (nullable = true)
 |-- description: string (nullable = true)
 |-- image: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- name: string (nullable = true)
 |-- prepTime: string (nullable = true)
 |-- recipeYield: string (nullable = true)
 |-- url: string (nullable = true)



In [5]:
df2 = df.select("datePublished","name","cookTime","ingredients","prepTime","recipeYield")

In [6]:
df2.printSchema()

root
 |-- datePublished: string (nullable = true)
 |-- name: string (nullable = true)
 |-- cookTime: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- prepTime: string (nullable = true)
 |-- recipeYield: string (nullable = true)



In [7]:
df2.show(5)



+-------------+--------------------+--------+--------------------+--------+-----------+
|datePublished|                name|cookTime|         ingredients|prepTime|recipeYield|
+-------------+--------------------+--------+--------------------+--------+-----------+
|   2010-10-14|Creamy Cheese Gri...|   PT45M|4-1/2 cups Water\...|    PT5M|          8|
|   2010-10-20|     Big Steak Salad|   PT20M|2 whole Rib-eye O...|    PT1M|          4|
|   2010-10-26|My Favorite Turke...|   PT15M|3 cups Apple Juic...|   PT10M|         18|
|   2010-10-27|Spaghetti Squash ...|    PT1H|2 whole Medium Sp...|   PT10M|          8|
|   2010-11-01|Pear Clafouti, Th...|   PT45M|2 whole Pears\n2 ...|   PT15M|         12|
+-------------+--------------------+--------+--------------------+--------+-----------+
only showing top 5 rows



In [8]:
###########################################################
#### Function to extract the Time per dish in minutes #####
###########################################################

def get_time(ISOtime_):
    hours = 0
    minutes = 0
    
    # Match for hours
    hours_match = re.search(r'(\d+)H', ISOtime_)
    if hours_match:
        hours = int(hours_match.group(1))
        
    # Match for minutes
    minutes_match = re.search(r'(\d+)M', ISOtime_)
    if minutes_match:
        minutes = int(minutes_match.group(1))
        
    # Calculate total minutes
    total_minutes = hours * 60 + minutes
    return total_minutes

###################################################
#### Function to extract the serving per dish #####
###################################################

def extract_serving_value(s):
    num_regex = re.compile(r'\d+')
    if len(s)==1 and isinstance(s,int):
        return (s)
    else:
        nums = [int(num) for num in num_regex.findall(s)]
        return (mean(nums)) if nums else (1)



In [9]:
def PrePocessing(df2):

    ####################################
    ##### Operation on datePublished ###
    ####################################

    # Converting the date to Date format from string
    df2 = df2.withColumn("Date Published", to_date(df2["datePublished"]))


    ####################################
    ##### Operation on Name Column #####
    ####################################


    # Remove extra spaces, punctuations and lower casing from the 'name' column
    df2 = df2.withColumn('Dish_Name', regexp_replace('name', '[^\w\s]+', '').alias('Name'))
    df2 = df2.withColumn('Dish_Name', regexp_replace('name', '\s+', ' ').alias('Name'))
    df2 = df2.withColumn('Dish_Name', lower(df['name']))


    ####################################
    ##### Operation on Ingredients #####
    ####################################


    # Remove extra spaces, punctuations and lower casing from the 'ingredients' column
    df2 = df2.withColumn('Ingridients', regexp_replace('ingredients', '[^\w\s]+', '').alias('Ingridients'))
    df2 = df2.withColumn('Ingridients', regexp_replace('ingredients', '\s+', ' ').alias('Ingridients'))
    df2 = df2.withColumn('Ingridients', lower(df['ingredients']))


    ####################################
    ##### Operation on Cook Time #######
    ####################################

    # Getting cook time in minutes 
    cooktime_func = udf(get_time, IntegerType())

    # Applying the Function to cooktime column
    df2 = df2.withColumn('Cook_Time', cooktime_func(df['cookTime']))

    # Replace null values with the mean of previous values
    mean_val_CT = df2.select(avg(col('Cook_Time'))).collect()[0][0]
    df2 = df2.fillna({'Cook_Time': mean_val_CT})

    # Round the values of newly added cook and prep columns
    df2 = df2.withColumn('Cook_Time', Round(col('Cook_Time'), 0))


    ####################################
    ##### Operation on Prep Time #######
    ####################################

    # Getting Prep time in minutes 
    preptime_func = udf(get_time, IntegerType())

    # Applying the Function to preptime column
    df2 = df2.withColumn('Prep_Time', preptime_func(df['prepTime']))

    # Replace null values with the mean of previous values
    mean_val_PT = df2.select(avg(col('Prep_Time'))).collect()[0][0]
    df2 = df2.fillna({'Prep_Time': mean_val_PT})

    # Round the values of newly added cook and prep columns
    df2 = df2.withColumn('Prep_Time', Round(col('Prep_Time'), 0))

    ####################################
    ##### Operation on recipeYield #####
    ####################################


    # Getting the Serving per dish 
    serving_func = udf(extract_serving_value, IntegerType())
    df2 = df2.withColumn('Serving', serving_func(df2['recipeYield']))

    # Round the values of newly added cook and prep columns
    df2 = df2.withColumn('Serving', Round(col('Serving'), 0))



    # Drop the old columns
    df2 = df2.drop('cookTime')
    df2 = df2.drop('prepTime')
    df2 = df2.drop('name')
    df2 = df2.drop('datePublished')
    df2 = df2.drop('recipeYield')
    df2 = df2.drop('ingredients')

    return df2




In [10]:
def TASK2DF(df2):


  # Create a new column by adding two existing columns
  df2 = df2.withColumn('Total_cook_time', col('Cook_Time') + col('Prep_Time'))

  # Create a new column based on the value of the 'sum' column
  df2 = df2.withColumn('Difficulty', when(col('Total_cook_time') < 30, 'Easy')
                                    .when((col('Total_cook_time') >= 30) & (col('Total_cook_time') <= 60), 'Medium')
                                    .when(col('Total_cook_time') > 60, 'Hard'))


  return df2



In [13]:
 

files = []
for file in glob.glob("input/*.json"):
    files.append(file)

    # Go through local directory and read json file then created a dataframe
    df = spark.read.json(files)
    
    #selecting columns that are useful
    df2 = df.select("datePublished","name","cookTime","ingredients","prepTime","recipeYield")
    
    # TASK 1 
    df2 = PrePocessing(df2) 

    # Persist the processed dataframe for future processing
    df2 = df2.persist()

    # TASK 2
    df2 = TASK2DF(df2)    
    # Beef Recipe Extraction  
    df2.createOrReplaceTempView('EXAM')
    beef_recipe = spark.sql("select * from EXAM where (Ingridients like '%beef%') or (Dish_Name like '%beef%')")

    # Calculating average cooking time duration per difficulty level of the whole dataset.
    df2.createOrReplaceTempView('AvgCookDiff')
    Avg_T_D = spark.sql("select Difficulty , ROUND(AVG(Total_cook_time),0) as AVGCook_Time from AvgCookDiff group by Difficulty")


    
    #Writing the final beef recipies to the local disk in CSV format
    Avg_T_D.write.format("csv").option("header", "true").mode("overwrite").save(f"output")


In [12]:
files

['input\\recipes-000.json',
 'input\\recipes-001.json',
 'input\\recipes-002.json']