In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 48 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 61.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=7978882117dcabf0f0aba58b41c02de097bd12c123c21bb55c8f3d62a84c5268
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [3]:
# Add all import commands to this cell. 
# Add all import commands to this cell. 
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, types
from pyspark.sql.functions import col, regexp_replace, lit, split
#import sys
#import string
from pyspark.sql.types import StructType, FloatType, ArrayType, StringType
# Add all import commands to this cell. 
import os
import sys
# Here you need to have same Python version on your local machine and on worker node i.e. EC2. here both should have python3.
os.environ["PYSPARK_PYTHON"] = "/bin/python3"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")


In [4]:
spark

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
raw_recipes_df = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/RAW_recipes_cleaned.csv', header=True, inferSchema=True)

In [7]:
raw_recipes_df.show(5)

+--------------------+------+-------+--------------+-------------------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|                name|    id|minutes|contributor_id|          submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|
+--------------------+------+-------+--------------+-------------------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|arriba   baked wi...|137739|     55|         47892|2005-09-16 00:00:00|['60-minutes-or-l...|[51.5, 0.0, 13.0,...|     11|['make a choice a...|autumn is my favo...|['winter squash',...|            7|
|a bit different  ...| 31490|     30|         26278|2002-06-17 00:00:00|['30-minutes-or-l...|[173.4, 18.0, 0.0...|      9|['preheat oven to...|this recipe calls...|['prepared pizza ...|            6|


In [8]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert raw_recipes_df.count() == 231637, "There is a mistake in reading the data."
assert len(raw_recipes_df.columns) == 12, "There is a mistake in reading the data."

## Extract ```nutrition``` values 

In [9]:
raw_recipes_df.select('nutrition').show(5, truncate=False)

+------------------------------------------+
|nutrition                                 |
+------------------------------------------+
|[51.5, 0.0, 13.0, 0.0, 2.0, 0.0, 4.0]     |
|[173.4, 18.0, 0.0, 17.0, 22.0, 35.0, 1.0] |
|[269.8, 22.0, 32.0, 48.0, 39.0, 27.0, 5.0]|
|[368.1, 17.0, 10.0, 2.0, 14.0, 8.0, 20.0] |
|[352.9, 1.0, 337.0, 23.0, 3.0, 0.0, 28.0] |
+------------------------------------------+
only showing top 5 rows



In [10]:
# List of nutrition columns

nutrition_column_names = ['calories',
                          'total_fat_PDV',
                          'sugar_PDV',
                          'sodium_PDV',
                          'protein_PDV',
                          'saturated_fat_PDV',
                          'carbohydrates_PDV']

In [11]:
# STEP 1 - string operations to convert nutrition to an array 

from pyspark.sql import functions as F
raw_recipes_df=raw_recipes_df.withColumn("nutrition", F.regexp_replace(F.regexp_replace(F.regexp_replace("nutrition", "\\]\\[", ""), "\\[", ""), "\\]", ""))


In [12]:
#split the text 

raw_recipes_df = (raw_recipes_df.withColumn('calories', split(raw_recipes_df['nutrition'], ',').getItem(0).cast('float')) \
                    .withColumn('total_fat_PDV', split(raw_recipes_df['nutrition'], ',').getItem(1).cast('float')) \
                    .withColumn('sugar_PDV', split(raw_recipes_df['nutrition'], ',').getItem(2).cast('float')) \
                    .withColumn('sodium_PDV', split(raw_recipes_df['nutrition'], ',').getItem(3).cast('float')) \
                    .withColumn('protein_PDV', split(raw_recipes_df['nutrition'], ',').getItem(4).cast('float')) \
                    .withColumn('saturated_fat_PDV', split(raw_recipes_df['nutrition'], ',').getItem(5).cast('float')) \
                    .withColumn('carbohydrates_PDV', split(raw_recipes_df['nutrition'], ',').getItem(6).cast('float'))# add command(s) to modify the string
                  )

In [13]:
#Printing Schema
raw_recipes_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)



In [14]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert raw_recipes_df.schema["carbohydrates_PDV"].dataType == FloatType(), "Recheck your typecasting"
assert raw_recipes_df.collect()[123432][14] == 62.0, "The columns have not been split correctly."
assert raw_recipes_df.collect()[10000][12] == 60.400001525878906, "The columns have not been split correctly."

In [15]:
for nutrition_col in nutrition_column_names:
    if nutrition_col != 'calories':
        nutrition_per_100_cal_col = (nutrition_col
                                 .replace('_PDV','')
                                 +'_per_100_cal')
        # Write code to create nutrition columns per 100 calories 
nutrition_column_names

['calories',
 'total_fat_PDV',
 'sugar_PDV',
 'sodium_PDV',
 'protein_PDV',
 'saturated_fat_PDV',
 'carbohydrates_PDV']

In [16]:
cols = list(set(raw_recipes_df.columns) - {'submitted'})

raw_recipes_df1 = raw_recipes_df.select(cols)

In [17]:
# Finding null values for all the columns
from pyspark.sql.functions import isnan, when, count, col
raw_recipes_df1.select([count(when(isnan(c), c)).alias(c) for c in raw_recipes_df1.columns]).show()

+-----------------+---------+-----------+-----+---+-------------+-----------+--------+-------+-------+---------+----+----------+-----------+-------------+-----------------+--------------+----+
|saturated_fat_PDV|nutrition|ingredients|steps| id|total_fat_PDV|description|calories|minutes|n_steps|sugar_PDV|tags|sodium_PDV|protein_PDV|n_ingredients|carbohydrates_PDV|contributor_id|name|
+-----------------+---------+-----------+-----+---+-------------+-----------+--------+-------+-------+---------+----+----------+-----------+-------------+-----------------+--------------+----+
|                0|        0|          0|    0|  0|            0|          0|       0|      0|      0|        0|   0|         0|          0|            0|                0|             0|   0|
+-----------------+---------+-----------+-----+---+-------------+-----------+--------+-------+-------+---------+----+----------+-----------+-------------+-----------------+--------------+----+



In [18]:
df1 = raw_recipes_df.filter("id == 112140")


In [19]:
df1.show()

+--------------------+------+-------+--------------+-------------------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+
|                name|    id|minutes|contributor_id|          submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|calories|total_fat_PDV|sugar_PDV|sodium_PDV|protein_PDV|saturated_fat_PDV|carbohydrates_PDV|
+--------------------+------+-------+--------------+-------------------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+
|all in the kitche...|112140|    130|        196586|2005-02-25 00:00:00|['time-to-make', ...|269.8, 22.0, 32.0...|      6|['

In [20]:
from pyspark.sql.functions import udf
#https://stackoverflow.com/questions/48282321/valueerror-cannot-convert-column-into-bool

@udf("double")    
def nutritonPercent(x,y):
    if y <= 0.0:
        return 0.0
    return ((x / y) * 100)

raw_recipes_df = (raw_recipes_df
                  .withColumn('total_fat_per_100_cal', nutritonPercent(raw_recipes_df.total_fat_PDV, raw_recipes_df.calories))
                  .withColumn('sugar_per_100_cal', nutritonPercent(raw_recipes_df.sugar_PDV, raw_recipes_df.calories))
                  .withColumn('sodium_per_100_cal',  nutritonPercent(raw_recipes_df.sodium_PDV, raw_recipes_df.calories))
                  .withColumn('protein_per_100_cal',  nutritonPercent(raw_recipes_df.protein_PDV, raw_recipes_df.calories))
                  .withColumn('saturated_fat_per_100_cal',  nutritonPercent(raw_recipes_df.saturated_fat_PDV, raw_recipes_df.calories))
                  .withColumn('carbohydrates_per_100_cal',  nutritonPercent(raw_recipes_df.carbohydrates_PDV, raw_recipes_df.calories))
                 )

In [21]:
df2 = raw_recipes_df.filter("id == 112140")

In [22]:
df2.show()

+--------------------+------+-------+--------------+-------------------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+---------------------+------------------+------------------+-------------------+-------------------------+-------------------------+
|                name|    id|minutes|contributor_id|          submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|calories|total_fat_PDV|sugar_PDV|sodium_PDV|protein_PDV|saturated_fat_PDV|carbohydrates_PDV|total_fat_per_100_cal| sugar_per_100_cal|sodium_per_100_cal|protein_per_100_cal|saturated_fat_per_100_cal|carbohydrates_per_100_cal|
+--------------------+------+-------+--------------+-------------------+--------------------+--------------------+-------+--------------------+---------

In [23]:
#Printing Schema
raw_recipes_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)
 |-- total_fat_per_100_cal: double (nullable = true)
 |-- sugar_per_100_cal: double (nullable = true)
 |-- sodium_per_100_cal: double (nullable = true)
 |-- protein_per_100_cal: double (nullable = true)
 |

In [24]:
# total fat check for id 28881
assert raw_recipes_df.filter("id == 28881").select('total_fat_per_100_cal').first()[0] == 0, "total_fat_per_100_cal for recipe 28881 should be 0"

# total fat check for id 112140
assert round(raw_recipes_df.filter("id == 112140").select('total_fat_per_100_cal').first()[0]) == 8, "total_fat_per_100_cal for recipe 112140 should b"

In [25]:
import pyspark.sql.functions

#pyspark.sql.functions.encode(raw_recipes_df.tags, 'ISO-8859–1')


In [26]:
#from pyspark.sql import functions as F
#raw_recipes_df=raw_recipes_df.withColumn("tags", F.regexp_replace(F.regexp_replace(F.regexp_replace("tags", "\\]\\[", ""), "\\[", ""), "\\]", ""))

In [27]:
#raw_recipes_df.select('tags').show(5, truncate=False)

In [28]:
from pyspark.sql.functions import regexp_replace,col
raw_recipes_df = raw_recipes_df.withColumn('tags', regexp_replace('tags',"\'",""))

In [29]:
raw_recipes_df.select('tags').show(20, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tags                                                                                                                                                                                                                                                                                                                                                                      |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [30]:
raw_recipes_df1.filter(raw_recipes_df1['tags'].rlike('[\!\@\$\^\&\-\_\;\:\?\.\%\#\&&\+\\*]')).count()

231429

In [35]:
from pyspark.sql.functions import split, regexp_replace

raw_recipes_df = (raw_recipes_df.withColumn('tags',regexp_replace('tags','\[',''))
                               .withColumn('tags',regexp_replace('tags','\]',''))
                               .withColumn('tags',regexp_replace('tags','\\s',''))
                               .withColumn('tags',regexp_replace('tags','\'',''))
                               .withColumn('tags',split(col('tags'),",")))

In [36]:
raw_recipes_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: timestamp (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)
 |-- total_fat_per_100_cal: double (nullable = true)
 |-- sugar_per_100_cal: double (nullable = true)
 |-- sodium_per_100_cal: double (nullable = true)
 |-- 

In [37]:

# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

#assert raw_recipes_df.schema["tags"].dataType == ArrayType(StringType(), True), "You have not split the string into an array."
assert raw_recipes_df.collect()[2][5] == ['time-to-make','course', 'preparation', 'main-dish', 'chili', 'crock-pot-slow-cooker', 'dietary', 'equipment', '4-hours-or-less'], "Recheck your string cleaning and splitting operations."

In [39]:
# Reading the second data set. 
# keep this cell unedited

raw_ratings_df = (spark.read.csv('/content/drive/MyDrive/Colab Notebooks/RAW_interactions_cleaned.csv', header=True, inferSchema=True)
                  .withColumn("review_date",  F.col("date"))
                  .drop(F.col("date"))
                  )

In [40]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert raw_ratings_df.count() == 1132367, "There is a mistake in reading the data."
assert len(raw_ratings_df.columns) == 5, "There is a mistake in reading the data."

In [41]:
interaction_level_df = raw_ratings_df.join(raw_recipes_df,raw_ratings_df.recipe_id ==  raw_recipes_df.id,"left")

In [42]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

list1 = raw_ratings_df.select('recipe_id').collect()
list2 = raw_recipes_df.select('id').collect()
exclusive_set = set(list1)-set(list2)

assert len(exclusive_set) == 0, "There is a mistake in reading one of the two data files."

In [43]:
interaction_level_df = (interaction_level_df
                        .withColumn('submitted',col('submitted').cast('date') 
                                   ))

In [44]:
interaction_level_df = (interaction_level_df
                        .withColumn('review_date',col('review_date').cast('date') 
                                   ))

In [49]:
from pyspark.sql.functions import datediff,col
from pyspark.sql.functions import months_between,col
interaction_level_df = (interaction_level_df
                        .withColumn('days_since_submission_on_review_date',
                                    datediff(col('review_date'),col('submitted'))
                                   )
                        .withColumn('months_since_submission_on_review_date',
                                    months_between(col('review_date'),col('submitted'))          
                                   )
                        .withColumn('years_since_submission_on_review_date',
                                    months_between(col('review_date'),col('submitted'))/lit(12)       
                                   ))

In [50]:
interaction_level_df.select('review_date','submitted','days_since_submission_on_review_date','months_since_submission_on_review_date','years_since_submission_on_review_date').show(5, truncate=False)

+-----------+----------+------------------------------------+--------------------------------------+-------------------------------------+
|review_date|submitted |days_since_submission_on_review_date|months_since_submission_on_review_date|years_since_submission_on_review_date|
+-----------+----------+------------------------------------+--------------------------------------+-------------------------------------+
|2009-01-12 |2008-12-17|26                                  |0.83870968                            |0.06989247333333333                  |
|2009-02-06 |2008-12-17|51                                  |1.64516129                            |0.13709677416666669                  |
|2009-05-04 |2008-12-17|138                                 |4.58064516                            |0.38172043000000005                  |
|2009-07-16 |2008-12-17|211                                 |6.96774194                            |0.5806451616666667                   |
|2009-10-20 |2008-12-17|307

In [51]:
## Write the raw_recipes_df
## Write data to the S3 bucket which will be your working directory 

interaction_level_df.write.mode('overwrite').parquet('/content/drive/MyDrive/Colab Notebooks/interaction_level_df')

raw_recipes_df.write.mode('overwrite').parquet('/content/drive/MyDrive/Colab Notebooks/raw_recipes_df')