# Problem Statement 🌾
As an ML engineer at food.com, our objective is to develop a recommender system that offers recipe suggestions to users by considering their preferences and the recipe they are currently viewing. A well-functioning recommender system holds the potential to boost user engagement and create additional business prospects. The efficiency of the recommendation engine directly influences the revenue generated by the website. However, constructing a recommender system from the ground up requires a significant amount of time and effort. In this assignment, our task is to analyze the available data and generate relevant features to construct the recommender system.

# By Priya, Simranjeet Kaur, Priyanka Sorate

# Initial Setup(importing libraries)

In [3]:
from pyspark.sql import SparkSession

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basic").getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fb10a7316d0>

In [6]:
from pyspark.sql import functions as F


# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType
from pyspark.sql.types import ArrayType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Importing the required libraries

import pyspark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Task 01 Cell 1 out of 1

raw_recipes_df = spark.read.csv("s3a://raw-recipes-clean-upgrad/RAW_recipes_cleaned.csv",inferSchema=True,header=True)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Task1:Reading the data 🍽️🔍

In [9]:
raw_recipes_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+-------+--------------+-------------------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|                name|    id|minutes|contributor_id|          submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|
+--------------------+------+-------+--------------+-------------------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|arriba   baked wi...|137739|     55|         47892|2005-09-16 00:00:00|['60-minutes-or-l...|[51.5, 0.0, 13.0,...|     11|['make a choice a...|autumn is my favo...|['winter squash',...|            7|
|a bit different  ...| 31490|     30|         26278|2002-06-17 00:00:00|['30-minutes-or-l...|[173.4, 18.0, 0.0...|      9|['preheat oven to...|this recipe calls...|['prepared pizza ...|            6|


In [10]:
raw_recipes_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)

#### Test cases for Task 01

In [11]:
# Code check cells
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert raw_recipes_df.count() == 231637
assert len(raw_recipes_df.columns) == 12
assert raw_recipes_df.schema["minutes"].dataType == IntegerType()
assert raw_recipes_df.schema["tags"].dataType == StringType()
assert raw_recipes_df.schema["n_ingredients"].dataType == IntegerType()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

All test cases pass, task 01 ends 😊✅

## Extract nutrition values

In [12]:
# List of nutrition columns

nutrition_column_names = ['calories',
                          'total_fat_PDV',
                          'sugar_PDV',
                          'sodium_PDV',
                          'protein_PDV',
                          'saturated_fat_PDV',
                          'carbohydrates_PDV']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Task 02: Extract individual features from the nutrition column. 🍔

As we have observed, when the Spark compiler reads the nutrition column from the raw_recipes_df DataFrame, it is treated as a string column instead of an array of float values. However, each row in the nutrition column contains seven values representing different nutrition information. Our task is to extract these individual values and create seven separate columns named calories, total fat (PDV), sugar (PDV), sodium (PDV), protein (PDV), saturated fat (PDV), and carbohydrates (PDV).

Code Explanation: To accomplish this task, we will use Spark's built-in functions to split the string column into an array column and then extract individual values from the array into separate columns.

In [13]:
raw_recipes_df.select("nutrition").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|           nutrition|
+--------------------+
|[51.5, 0.0, 13.0,...|
|[173.4, 18.0, 0.0...|
|[269.8, 22.0, 32....|
|[368.1, 17.0, 10....|
|[352.9, 1.0, 337....|
+--------------------+
only showing top 5 rows

In [14]:
# Task 02 Cell 1 out of 2
# 2.1 - string operations to remove square brakets

raw_recipes_df = (raw_recipes_df
                  .withColumn('nutrition', (F.regexp_replace("nutrition","[\[\]]",""))))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Task 02 Cell 2 out of 3
# STEP 2.2 - split the neutrition string into seven individial values. 
# Create an object to split the nutrition column

nutrition_cols_split = pyspark.sql.functions.split(raw_recipes_df["nutrition"],",")
# Write a loop to extract individual values from the nutrition column

for col_index, col_name in enumerate(nutrition_column_names):    
    raw_recipes_df = (raw_recipes_df.withColumn(col_name, nutrition_cols_split.getItem(col_index).cast("float")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Test cases for task 02

In [16]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert raw_recipes_df.schema["carbohydrates_PDV"].dataType == FloatType(), "Recheck your typecasting"
assert raw_recipes_df.collect()[123432][14] == 62.0, "The columns have not been split correctly."
assert raw_recipes_df.collect()[10000][12] == 60.400001525878906, "The columns have not been split correctly."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

All test cases passed task 02 ends !!😊✅

## Make nutrition-per-100 calorie columns

By converting the nutrition values from absolute to relative terms, we ensure that portion size is not a factor in the analysis. 

Naming convention: Original column name ```total fat (PDV)```, column name after column ```total_fat_per_100_cal```

## Task 03: Standardize the nutrition values </font>

In [17]:
# Task 03 Cell 1 out of 1

for nutrition_col in nutrition_column_names:
    if nutrition_col != "calories":
        nutrition_per_100_cal_col = (nutrition_col
                                 .replace('_PDV','')
                                 +'_per_100_cal')
        raw_recipes_df = raw_recipes_df.withColumn(nutrition_per_100_cal_col,
                                              raw_recipes_df[nutrition_col]*100/raw_recipes_df["calories"])
        
        # You might end up adding nulls to the data because of our intended transformation. 
        # Perform a fill na operation to fill all the nulls with 0s. 
        # You must limit the scope of the fill na to the current column only. 
        
        raw_recipes_df = raw_recipes_df.fillna(value = 0, subset = [nutrition_per_100_cal_col]) 
        

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Test cases for Task 03

In [18]:
# total fat check for id 28881
assert raw_recipes_df.filter("id == 28881").select('total_fat_per_100_cal').first()[0] == 0, "total_fat_per_100_cal for recipe 28881 should be 0"

# total fat check for id 112140
assert round(raw_recipes_df.filter("id == 112140").select('total_fat_per_100_cal').first()[0]) == 8, "total_fat_per_100_cal for recipe 112140 should be 8"

# checking for nulls
for c in ['total_fat_per_100_cal','sugar_per_100_cal','sodium_per_100_cal','protein_per_100_cal',
                          'saturated_fat_per_100_cal','carbohydrates_per_100_cal']:
    assert raw_recipes_df.select(F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c)).collect()[0][0] == 0, "There are Nulls in the data"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
raw_recipes_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)
 |-- total_fat_per_100_cal: double (nullable = false)
 |-- sugar_per_100_cal: double (nullable = false)
 |-- sodium_per_100_cal: double (nullable = false)
 |-- protein_per_100_cal: double (nullable = false

## Task 04: Convert the tags column from a string to an array of strings </font>

Currently, the tags column is a string column but holds an array of strings. 

Your task is to convert the tags columns from a string to an array of strings. 
   

In [20]:
# Task 04 Cell 1 out of 1

raw_recipes_df = (raw_recipes_df
                  .withColumn('tags', F.regexp_replace("tags","[\\[\\]\\']","")
                             )
                  .withColumn('tags', F.split("tags",", ")
                             ))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
raw_recipes_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: timestamp (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)
 |-- total_fat_per_100_cal: double (nullable = false)
 |-- sugar_per_100_cal: double (nullable = false)
 |-- sodium_per_100_cal: double (nullable = false)
 |-

In [22]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert raw_recipes_df.schema["tags"].dataType == ArrayType(StringType(), True), "You have not split the string into an array."
assert raw_recipes_df.collect()[2][5] == ['time-to-make','course', 'preparation', 'main-dish', 'chili', 'crock-pot-slow-cooker', 'dietary', 'equipment', '4-hours-or-less'], "Recheck your string cleaning and splitting operations."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Join Recipe Data to Review Data

In [23]:
# Reading the second data set. 
# keep this cell unedited

raw_ratings_df = (spark.read.csv("s3a://raw-interactions-upgrad/RAW_interactions_cleaned.csv", 
                                 header=True, 
                                 inferSchema= True)
                  .withColumn("review_date",  F.col("date"))
                  .drop(F.col("date"))
                  )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
raw_ratings_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- user_id: integer (nullable = true)
 |-- recipe_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review: string (nullable = true)
 |-- review_date: timestamp (nullable = true)

In [25]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert raw_ratings_df.count() == 1132367, "There is a mistake in reading the data."
assert len(raw_ratings_df.columns) == 5, "There is a mistake in reading the data."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
raw_ratings_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------+------+--------------------+-------------------+
|user_id|recipe_id|rating|              review|        review_date|
+-------+---------+------+--------------------+-------------------+
|  38094|    40893|     4|Great with a sala...|2003-02-17 00:00:00|
|1293707|    40893|     5|So simple  so del...|2011-12-21 00:00:00|
|   8937|    44394|     4|This worked very ...|2002-12-01 00:00:00|
| 126440|    85009|     5|I made the Mexica...|2010-02-27 00:00:00|
|  57222|    85009|     5|Made the cheddar ...|2011-10-01 00:00:00|
+-------+---------+------+--------------------+-------------------+
only showing top 5 rows

## Task 05: Read the second data file

In [27]:
# Task 05 Cell 1 out of 1

interaction_level_df = raw_ratings_df.join(raw_recipes_df, raw_ratings_df.recipe_id == raw_recipes_df.id, "inner")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Test cases for Task 05

In [28]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert (interaction_level_df.count() ,len(interaction_level_df.columns)) == (1132367, 30), "The type of join is incorrect"

list1 = raw_ratings_df.select('recipe_id').collect()
list2 = raw_recipes_df.select('id').collect()
exclusive_set = set(list1)-set(list2)

assert len(exclusive_set) == 0, "There is a mistake in reading one of the two data files."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Task 06:  Create time-based features

Currently, both the date columns, the submitted date, and the review date are in string forms. 
    
First convert the ```submitted``` and ```review_date``` to DateType()

Use review date and submission date to derive new features:
1. ```days_since_submission_on_review_date``` Number of days between the recipe submission and the current review.  
2. ```months_since_submission_on_review_date``` Number of months between the recipe submission and the current review. 
3. ```years_since_submission_on_review_date```Number of years between the recipe submission and the current review. 

In [29]:
# Task 06 Cell 1 out of 2

interaction_level_df = (interaction_level_df
                        .withColumn('submitted', F.col("submitted").cast("date")
                                   )
                        .withColumn('review_date', F.col("review_date").cast("date")
                                   )
                                             
                       )


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
interaction_level_df = (interaction_level_df
                        .withColumn('days_since_submission_on_review_date', F.datediff("review_date", "submitted")
                                     # Pyspark function to find the number of days between two dates              
                                   )
                        .withColumn('months_since_submission_on_review_date', F.months_between("review_date", "submitted")
                                     # Pyspark function to find the number of months between two dates          
                                   )
                        .withColumn('years_since_submission_on_review_date', F.months_between("review_date", "submitted")/12
                                     # Pyspark function to find the number of months between two dates / 12          
                                   )
                         )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Test cases for Task 06

In [31]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert interaction_level_df.schema["days_since_submission_on_review_date"].dataType == IntegerType()

assert (interaction_level_df.filter((interaction_level_df.user_id == 428885) & (interaction_level_df.recipe_id == 335241))
                            .select('days_since_submission_on_review_date').collect()[0][0]) == 77
assert (interaction_level_df.filter((interaction_level_df.user_id == 2025676) & (interaction_level_df.recipe_id == 94265))
                            .select('months_since_submission_on_review_date').collect()[0][0]) == 153.22580645
assert (interaction_level_df.filter((interaction_level_df.user_id == 338588) & (interaction_level_df.recipe_id == 21859))
                            .select('years_since_submission_on_review_date').collect()[0][0]) == 4.564516129166667

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Save the data we have created so far in a parquet file. 

In [32]:
interaction_level_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- user_id: integer (nullable = true)
 |-- recipe_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: date (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carboh

In [33]:
assert (interaction_level_df.count() ,len(interaction_level_df.columns) ) == (1132367, 33)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
## Write the raw_recipes_df
## create a folder named data in you current directry before running this. 

from pyspark.sql import SparkSession
# interaction_level_df.write.parquet("interaction_level_df") # Modify the path as you need


interaction_level_df.write.mode("overwrite").parquet("interaction_level_df")  # Modify the path as you need

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### <center>The 6 task's given has been completed.</center>