###               Recipe Recommendation Project

#### Importing libraries

In [14]:
### Import libraries for PySpark
from pyspark.sql import SparkSession

import pyspark 
from pyspark.sql import functions as F

# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType
from pyspark.sql.types import ArrayType

# Importing required libraries/Functions
from pyspark.sql.functions import split, regexp_replace, col

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Initiating Spark session

In [2]:
spark = SparkSession.builder.appName("RecipeData").getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Reading the datasets

In [3]:
raw_recipes_df = spark.read.csv('s3://recipedata-ravi/RAW_recipes_cleaned.csv',inferSchema=True,header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
raw_recipes_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: date (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)

In [6]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert raw_recipes_df.count() == 231637, "There is a mistake in reading the data."
assert len(raw_recipes_df.columns) == 12, "There is a mistake in reading the data."
assert raw_recipes_df.schema["minutes"].dataType == IntegerType(), "The data types have not been read correctly."
assert raw_recipes_df.schema["tags"].dataType == StringType(), "The data types have not been read correctly."
assert raw_recipes_df.schema["n_ingredients"].dataType == IntegerType(), "The data types have not been read correctly."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
raw_ratings_df = (spark.read.csv("s3://recipedata-ravi/RAW_interactions_cleaned.csv", 
                                 inferSchema= True, 
                                 header= True)
                  .withColumn("review_date",  F.col("date"))
                  .drop(F.col("date"))
                  )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
raw_ratings_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- user_id: integer (nullable = true)
 |-- recipe_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review: string (nullable = true)

In [24]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert raw_ratings_df.count() == 1132367, "There is a mistake in reading the data."
assert len(raw_ratings_df.columns) == 5, "There is a mistake in reading the data."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Extract Nutrition Values

In [7]:
# List of nutrition columns

nutrition_column_names = ['calories',
                          'total_fat_PDV',
                          'sugar_PDV',
                          'sodium_PDV',
                          'protein_PDV',
                          'saturated_fat_PDV',
                          'carbohydrates_PDV']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Extract individual features from Nutrition column.

##### The nutrition column is a string column when it should be an array of float values. Each of the row in the nutrition column contain seven values. Each value represents nutrition information in the below order.

##### 1st value: calories
##### 2nd value: total_fat_PDV
##### 3rd value: sugar_PDV
##### 4th value: sodium_PDV
##### 5th value: protein_PDV
##### 6th value: saturated_fat_PDV
##### 7th value: carbohydrates_PDV



In [8]:
# 2.1 - string operations to remove square brakets

raw_recipes_df = (raw_recipes_df
                  .withColumn('nutrition', (F.regexp_replace('nutrition',"[\[\]]",""))
                             ))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# split the nutrition string into seven individial values. 
# Create an object to split the nutrition column

# pyspark function to split values based on a delimiter.  
nutrition_cols_split = pyspark.sql.functions.split(raw_recipes_df['nutrition'],',')
# nutrition_cols_split = raw_recipes_df.withColumn("nutrition", split(raw_recipes_df["nutrition"], ","))

# extract individual values from the nutrition column

for col_index, col_name in enumerate(nutrition_column_names):
    # col_index holds the index number of each column, e.g., calories will be 0
    # col_name holds the name of each column 
    
    raw_recipes_df = (raw_recipes_df.withColumn(col_name, nutrition_cols_split.getItem(col_index).cast("float")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Test cases to check Nutrition column split

In [16]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert raw_recipes_df.schema["carbohydrates_PDV"].dataType == FloatType(), "Recheck your typecasting"
assert raw_recipes_df.collect()[123432][14] == 62.0, "The columns have not been split correctly."
assert raw_recipes_df.collect()[10000][12] == 60.400001525878906, "The columns have not been split correctly."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Transform Nutrition-per-100 Calorie columns

##### By converting the nutrition values from absolute to relative terms, we ensure that portion size is not a factor in the analysis.

In [17]:
### Standardize the nutrition values

### The current values for nutrition columns are not on the same scale. So standardizing the nutrition columns using calories as the base of standardization.


col_name_list = []
for nutrition_col in nutrition_column_names:# loop over each of the newly created nutrition columns    
    if nutrition_col != 'calories': # the calories column should not be a part of the transformation exercise
        # following code will name the new columns 
        nutrition_per_100_cal_col = (nutrition_col
                                 .replace('_PDV','')
                                 +'_per_100_cal')
        col_name_list.append(nutrition_per_100_cal_col)
        raw_recipes_df = raw_recipes_df.withColumn(nutrition_per_100_cal_col,
                                                   col(nutrition_col)*100/ col('calories')
                                               # pyspark code to recreate the intended transformation 
                                                  )
        
        # You might end up adding nulls to the data because of our intended transformation. 
        # Perform a fill na operation to fill all the nulls with 0s. 
        # You must limit the scope of the fill na to the current column only. 
        
        raw_recipes_df = raw_recipes_df.na.fill(value=0,subset= col_name_list) # pyspark code to fill nulls with 0 in only the current nutrition_per_100_cal_col

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Test cases for validating Standardization of Nutrition values

In [18]:
# total fat check for id 28881
assert raw_recipes_df.filter("id == 28881").select('total_fat_per_100_cal').first()[0] == 0, "total_fat_per_100_cal for recipe 28881 should be 0"

# total fat check for id 112140
assert round(raw_recipes_df.filter("id == 112140").select('total_fat_per_100_cal').first()[0]) == 8, "total_fat_per_100_cal for recipe 112140 should be 8"

# checking for nulls
for c in ['total_fat_per_100_cal','sugar_per_100_cal','sodium_per_100_cal','protein_per_100_cal',
                          'saturated_fat_per_100_cal','carbohydrates_per_100_cal']:
    assert raw_recipes_df.select(F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c)).collect()[0][0] == 0, "There are Nulls in the data"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Convert Tags column from a string to an array of strings

In [19]:
### Currently the tags column is a string data type column but holds an array of strings.
### Converting the 'tags' column to an array of strings.

### Remove '[', ']' & "'" punctuation marks from the 'tags' column. Split the 'tags' column based on the comma delimitter.

raw_recipes_df = (raw_recipes_df
                  .withColumn('tags', 
                              regexp_replace(regexp_replace(regexp_replace("tags", "\\[", ""), "\\]", ""), "'", "")))

raw_recipes_df = raw_recipes_df.withColumn("tags", split(raw_recipes_df["tags"], ", "))

raw_recipes_df = raw_recipes_df.withColumn("tags", raw_recipes_df["tags"].cast(ArrayType(StringType(), True)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Test cases to check the conversion of Tags column

In [20]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert raw_recipes_df.schema["tags"].dataType == ArrayType(StringType(), True), "You have not split the string into an array."
assert raw_recipes_df.collect()[2][5] == ['time-to-make','course', 'preparation', 'main-dish', 'chili', 'crock-pot-slow-cooker', 'dietary', 'equipment', '4-hours-or-less'], "Recheck your string cleaning and splitting operations."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Join the Recipe Data to Review Data

In [63]:
### Join the Recipe Data to Review Data

interaction_level_df = raw_recipes_df.join(raw_ratings_df, raw_recipes_df.id==raw_ratings_df.recipe_id, how='inner')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [64]:
interaction_level_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: date (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)
 |-- total_fat_per_100_cal: double (nullable = false)
 |-- sugar_per_100_cal: double (nullable = false)
 |-- sodium_per_100_cal: double (nullable = false)
 |-- pro

#### Test cases for validating the Joining of two files

In [65]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert (interaction_level_df.count() ,len(interaction_level_df.columns)) == (1132367, 30), "The type of join is incorrect"

list1 = raw_ratings_df.select('recipe_id').collect()
list2 = raw_recipes_df.select('id').collect()
exclusive_set = set(list1)-set(list2)

assert len(exclusive_set) == 0, "There is a mistake in reading one of the two data files."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Creating time based features

##### The date columns are in string format, convert them to date format

In [66]:
### Convert the 'submitted' column to date format

interaction_level_df = interaction_level_df.withColumn('submitted', F.to_date('submitted', 'yyyy-MM-dd'))

interaction_level_df = interaction_level_df.withColumn('review_date', F.to_date('review_date', 'yyyy-MM-dd'))   

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Use review date and submission date to derive new features:

 - days_since_submission_on_review_date:   Number of days between the recipe submission and the current review.
 - months_since_submission_on_review_date: Number of months between the recipe submission and the current review.
 - years_since_submission_on_review_date:  Number of years between the recipe submission and the current review.

In [67]:
interaction_level_df = (interaction_level_df
                        .withColumn('days_since_submission_on_review_date',F.datediff("review_date", "submitted")
                                     # Pyspark function to find the number of days between two dates              
                                   )
                        .withColumn('months_since_submission_on_review_date',F.months_between("review_date", "submitted")
                                     # Pyspark function to find the number of months between two dates          
                                   )
                        .withColumn('years_since_submission_on_review_date',F.months_between("review_date", "submitted")/12
                                     # Pyspark function to find the number of months between two dates / 12          
                                   )
                         )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Test cases for validating the date columns derived above

In [68]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert interaction_level_df.schema["days_since_submission_on_review_date"].dataType == IntegerType()

assert (interaction_level_df.filter((interaction_level_df.user_id == 428885) & (interaction_level_df.recipe_id == 335241))
                            .select('days_since_submission_on_review_date').collect()[0][0]) == 77
assert (interaction_level_df.filter((interaction_level_df.user_id == 2025676) & (interaction_level_df.recipe_id == 94265))
                            .select('months_since_submission_on_review_date').collect()[0][0]) == 153.22580645
assert (interaction_level_df.filter((interaction_level_df.user_id == 338588) & (interaction_level_df.recipe_id == 21859))
                            .select('years_since_submission_on_review_date').collect()[0][0]) == 4.564516129166667

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Save the data created till this part in a parquet file. 

In [70]:
interaction_level_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: date (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)
 |-- total_fat_per_100_cal: double (nullable = false)
 |-- sugar_per_100_cal: double (nullable = false)
 |-- sodium_per_100_cal: double (nullable = false)
 |-- pro

In [71]:
assert (interaction_level_df.count() ,len(interaction_level_df.columns) ) == (1132367, 33)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [72]:
### Data will be saved in a parquet file in S3.
### More analysis will be continued in another Notebook

interaction_level_df.write.mode("overwrite").parquet("s3://recipedata-ravi/interaction_level_df_processed.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…