## Setup

In [1]:
import os
# give googe drive the required permission
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Create a folder in your drive and add the name of that folder here. 
# For example, for the code below to run correctly, you need to have a folder named FoodRecSys in 'My Drive'.  
# The said folder will be your home directory for the rest of the project. 
# You will be able to save and read data from the folder. 

os.chdir("/content/drive/MyDrive/FoodRecSys")
os.getcwd()

'/content/drive/MyDrive/FoodRecSys'

In [3]:
os.chdir("/content/drive/MyDrive/food_recsys_project/Code Files")
os.getcwd()

'/content/drive/MyDrive/food_recsys_project/Code Files'

In [4]:
try:
  import pyspark 
except:
  !pip install pyspark==3.1.2
  import pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.1.2
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 69 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 20.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880769 sha256=8e974fc775239a3a5efe90acf5fa3f389c7cd8103895bc43c68782e037f03b80
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [5]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

In [6]:
spark = SparkSession.builder.master("local").config('spark.ui.port', '4050').getOrCreate()

In [7]:
spark

## Imports

In [8]:
import pandas as pd 
import numpy as np

from pyspark.sql import functions as F
# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType, ArrayType

## Read the data 

In [9]:
!wget https://raw-recipes-clean-upgrad.s3.amazonaws.com/raw_recipies_small.csv -P /content/drive/MyDrive/FoodRecSys

--2022-10-07 03:20:55--  https://raw-recipes-clean-upgrad.s3.amazonaws.com/raw_recipies_small.csv
Resolving raw-recipes-clean-upgrad.s3.amazonaws.com (raw-recipes-clean-upgrad.s3.amazonaws.com)... 52.216.165.251
Connecting to raw-recipes-clean-upgrad.s3.amazonaws.com (raw-recipes-clean-upgrad.s3.amazonaws.com)|52.216.165.251|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 25722029 (25M) [text/csv]
Saving to: ‘/content/drive/MyDrive/FoodRecSys/raw_recipies_small.csv.1’


2022-10-07 03:20:56 (54.6 MB/s) - ‘/content/drive/MyDrive/FoodRecSys/raw_recipies_small.csv.1’ saved [25722029/25722029]



In [10]:
!wget https://raw-recipes-clean-upgrad.s3.amazonaws.com/raw_ratings_small.csv -P /content/drive/MyDrive/FoodRecSys

--2022-10-07 03:20:56--  https://raw-recipes-clean-upgrad.s3.amazonaws.com/raw_ratings_small.csv
Resolving raw-recipes-clean-upgrad.s3.amazonaws.com (raw-recipes-clean-upgrad.s3.amazonaws.com)... 52.216.165.251
Connecting to raw-recipes-clean-upgrad.s3.amazonaws.com (raw-recipes-clean-upgrad.s3.amazonaws.com)|52.216.165.251|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 28954423 (28M) [text/csv]
Saving to: ‘/content/drive/MyDrive/FoodRecSys/raw_ratings_small.csv.1’


2022-10-07 03:20:57 (43.6 MB/s) - ‘/content/drive/MyDrive/FoodRecSys/raw_ratings_small.csv.1’ saved [28954423/28954423]



In [11]:
raw_ratings_df = (spark.read.csv("/content/drive/MyDrive/FoodRecSys/raw_ratings_small.csv", # modify the path to read the data
                                 header=True, 
                                 inferSchema= True))

In [12]:
raw_recipes_df = spark.read.csv("/content/drive/MyDrive/FoodRecSys/raw_recipies_small.csv", # modify the path to read the data
                                header=True, 
                                inferSchema=True)

In [13]:
raw_ratings_df.count()

93357

In [14]:
assert (raw_recipes_df.count(), len(raw_recipes_df.columns)) == (20340, 13)
assert (raw_ratings_df.count(), len(raw_ratings_df.columns)) == (93357, 5)

#### Decide a split date based on the ratings dataframe. 

In [15]:
# Find the number of data points in the interaction dataset. 
# You can use the count() method. 
# The output must be an integer.

num_review_int = raw_ratings_df.count()

#### Task 01 - Train Test Split 

Divide the data into train and test based on the 80 - 20 split using the approach discussed. You will have to save the data in a parquet file. 

In [16]:
test_num_reviews_int = round(num_review_int *0.2)

In [17]:
# Sort the interactions dataset in descending order of review date. 
# Extract ```test_num_reviews_int``` most recent reviews. 

temp_ratings_df = (raw_ratings_df.sort("review_date", ascending=False)
                                 .limit(test_num_reviews_int)
                  )

In [18]:
assert temp_ratings_df.count()  == 18671
assert raw_recipes_df.collect()[11][4] <= raw_recipes_df.collect()[10][4] 

In [19]:
temp_ratings_df.collect()[-1][4]

'2011-07-17'

Split the data into two parts before and after 2011-07-17. 

- All reviews in the ratings data after 2011-07-17 will not exsist in the training set. 
- For all future predictions the date will be set at 2011-07-18.   

In [20]:
# Join raw_recipes and raw_ratings
# Use recipe_id as the key to join these dataframes 
# The resulting dataframe must have all rows from the raw_ratings dataframe. 

interaction_level_df = raw_ratings_df.join(
                                           raw_recipes_df,# dataframe 2
                                           raw_ratings_df.recipe_id ==  raw_recipes_df.id,# key to join 
                                           "left"# how to join    
                                          )

In [21]:
interaction_level_df.count()

93357

In [22]:
interaction_level_df.show()

+----------+---------+------+--------------------+-----------+--------------------+-----+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
|   user_id|recipe_id|rating|              review|review_date|                name|   id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|year_of_review|
+----------+---------+------+--------------------+-----------+--------------------+-----+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------------+
|    152552|    29601|     5|Excellent. It was...| 2004-08-20|zippy cold spicy ...|29601|    270|         30367|2002-05-29|['time-to-make', ...|[277.2, 15.0, 13....|     12|['in a heavy 10-i...|shrimp d

In [23]:
# Use the filter command to separate the datasets. 
# All interactions which were rated BEFORE '2011-07-17' will be train data. 
from pyspark.sql.functions import col

train_interaction_level_df  = (interaction_level_df.filter(  
                                            (col("review_date") < '2011-07-17' )
                                           ))

In [24]:
# Use the filter command to separate the datasets. 
# All interactions which were rated ON OR AFTER '2011-07-17' will be test data. 
from pyspark.sql.functions import col
test_interaction_level_all_recipies_df  = (interaction_level_df.filter(  
                                            (col("review_date") >= '2011-07-17' )
                                           ))

In [25]:
assert (test_interaction_level_all_recipies_df.count(), len(test_interaction_level_all_recipies_df.columns)) == (18684, 18)
assert (train_interaction_level_df.count(), len(train_interaction_level_df.columns)) == (74673, 18)

In [26]:
test_interaction_level_all_recipies_df.count()

18684

In [27]:
len(test_interaction_level_all_recipies_df.columns)

18

In [28]:
train_interaction_level_df.count()

74673

In [29]:
len(train_interaction_level_df.columns)

18

In [30]:
# create data files for modeling 

(train_interaction_level_df.coalesce(1)
                           .write.mode('overwrite')
                           .parquet('/content/drive/MyDrive/food_recsys_project/Code Files/train/train_interaction_level_df.parquet'))  # change the file name and file path

(test_interaction_level_all_recipies_df.coalesce(1)
                                       .write.mode('overwrite')
                                       .parquet('/content/drive/MyDrive/food_recsys_project/Code Files/test/test_interaction_level_df.parquet'))  # change the file name and file path