# Setting up Colab and Spark environment

In [None]:
# Install spark-related dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz

!pip install -q findspark
!pip install pyspark

# Set up required environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"



In [None]:
# Tools to connect to the Spark server
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [None]:
APP_NAME = "recipes_interactions"
SPARK_URL = "local[*]"
spark = SparkSession.builder.appName("recipes_interactions").master("local[*]").getOrCreate()

In [None]:
# Import module
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Task 1: Read the Data

In [None]:
# Point Colaboratory to Google Drive
from google.colab import drive
from google.colab import files
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Download datasets directly to Google Drive

In [None]:
import requests

# RAW_recipes_cleaned.csv data

file_url = "https://raw-recipes-clean-upgrad.s3.amazonaws.com/RAW_recipes_cleaned.csv"

r = requests.get(file_url, stream = True)

with open("/content/drive/MyDrive/RAW_recipes_cleaned.csv", "wb") as file:
	for block in r.iter_content(chunk_size = 1024):
		if block:
			file.write(block)

## Load datasets

In [None]:
# Create dataset from RAW_recipes_cleaned.csv
df_recipes = spark.read.csv("/content/drive/MyDrive/RAW_recipes_cleaned.csv", header=True)

In [None]:
df_recipes.show()

+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|
+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|arriba   baked wi...|137739|     55|         47892|2005-09-16|['60-minutes-or-l...|[51.5, 0.0, 13.0,...|     11|['make a choice a...|autumn is my favo...|['winter squash',...|            7|
|a bit different  ...| 31490|     30|         26278|2002-06-17|['30-minutes-or-l...|[173.4, 18.0, 0.0...|      9|['preheat oven to...|this recipe calls...|['prepared pizza ...|            6|
|all in the kitche...|112140|    130|        

## Check data type

In [None]:
df_recipes.printSchema()

root
 |-- name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- minutes: string (nullable = true)
 |-- contributor_id: string (nullable = true)
 |-- submitted: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: string (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: string (nullable = true)



# Task 2: Extract Nutrition Features

In [None]:
# Check data from nutrition column
df_recipes.select("nutrition").show(10, truncate=False)

+---------------------------------------------------+
|nutrition                                          |
+---------------------------------------------------+
|[51.5, 0.0, 13.0, 0.0, 2.0, 0.0, 4.0]              |
|[173.4, 18.0, 0.0, 17.0, 22.0, 35.0, 1.0]          |
|[269.8, 22.0, 32.0, 48.0, 39.0, 27.0, 5.0]         |
|[368.1, 17.0, 10.0, 2.0, 14.0, 8.0, 20.0]          |
|[352.9, 1.0, 337.0, 23.0, 3.0, 0.0, 28.0]          |
|[160.2, 10.0, 55.0, 3.0, 9.0, 20.0, 7.0]           |
|[380.7, 53.0, 7.0, 24.0, 6.0, 24.0, 6.0]           |
|[1109.5, 83.0, 378.0, 275.0, 96.0, 86.0, 36.0]     |
|[4270.8, 254.0, 1306.0, 111.0, 127.0, 431.0, 220.0]|
|[2669.3, 160.0, 976.0, 107.0, 62.0, 310.0, 138.0]  |
+---------------------------------------------------+
only showing top 10 rows



In [None]:
# Replace bracket in nutrition column
df_recipes = df_recipes.withColumn("nutrition", regexp_replace(col("nutrition"), "\[|\]", ""))

In [None]:
df_recipes.select("nutrition").show(10, truncate=False)

+-------------------------------------------------+
|nutrition                                        |
+-------------------------------------------------+
|51.5, 0.0, 13.0, 0.0, 2.0, 0.0, 4.0              |
|173.4, 18.0, 0.0, 17.0, 22.0, 35.0, 1.0          |
|269.8, 22.0, 32.0, 48.0, 39.0, 27.0, 5.0         |
|368.1, 17.0, 10.0, 2.0, 14.0, 8.0, 20.0          |
|352.9, 1.0, 337.0, 23.0, 3.0, 0.0, 28.0          |
|160.2, 10.0, 55.0, 3.0, 9.0, 20.0, 7.0           |
|380.7, 53.0, 7.0, 24.0, 6.0, 24.0, 6.0           |
|1109.5, 83.0, 378.0, 275.0, 96.0, 86.0, 36.0     |
|4270.8, 254.0, 1306.0, 111.0, 127.0, 431.0, 220.0|
|2669.3, 160.0, 976.0, 107.0, 62.0, 310.0, 138.0  |
+-------------------------------------------------+
only showing top 10 rows



In [None]:
# Split nutrition column to 7 column, separate by commas
df_recipes = df_recipes.withColumn("nutrition_split", split(col("nutrition"), ","))

In [None]:
# Extract the values and create new columns
df_recipes = df_recipes.withColumn("calories", col("nutrition_split")[0])
df_recipes = df_recipes.withColumn("total_fat_PDV", col("nutrition_split")[1])
df_recipes = df_recipes.withColumn("sugar_PDV", col("nutrition_split")[2])
df_recipes = df_recipes.withColumn("sodium_PDV", col("nutrition_split")[3])
df_recipes = df_recipes.withColumn("protein_PDV", col("nutrition_split")[4])
df_recipes = df_recipes.withColumn("saturated_fat_PDV", col("nutrition_split")[5])
df_recipes = df_recipes.withColumn("carbohydrates_PDV", col("nutrition_split")[6])

In [None]:
# Drop the temporary `nutrition_split` column
df_recipes = df_recipes.drop("nutrition_split")

# Show the DataFrame
df_recipes.show()

+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+
|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|calories|total_fat_PDV|sugar_PDV|sodium_PDV|protein_PDV|saturated_fat_PDV|carbohydrates_PDV|
+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+
|arriba   baked wi...|137739|     55|         47892|2005-09-16|['60-minutes-or-l...|51.5, 0.0, 13.0, ...|     11|['make a choice a...|autumn is my favo

# Task 3: Standardize Nutrition Values
Standardize the nutrition values to a per 100 calorie basis to account for serving size variations.


In [None]:
# Calculate the proportion of each nutrient per calorie
df_recipes = df_recipes.withColumn("total_fat_PDV_per_calorie", col("total_fat_PDV") / col("calories"))
df_recipes = df_recipes.withColumn("sugar_PDV_per_calorie", col("sugar_PDV") / col("calories"))
df_recipes = df_recipes.withColumn("sodium_PDV_per_calorie", col("sodium_PDV") / col("calories"))
df_recipes = df_recipes.withColumn("protein_PDV_per_calorie", col("protein_PDV") / col("calories"))
df_recipes = df_recipes.withColumn("saturated_fat_PDV_per_calorie", col("saturated_fat_PDV") / col("calories"))
df_recipes = df_recipes.withColumn("carbohydrates_PDV_per_calorie", col("carbohydrates_PDV") / col("calories"))

In [None]:
# Scale the nutrients to 100 calories
df_recipes = df_recipes.withColumn("total_fat_PDV_per_100_calories", col("total_fat_PDV_per_calorie") * 100)
df_recipes = df_recipes.withColumn("sugar_PDV_per_100_calories", col("sugar_PDV_per_calorie") * 100)
df_recipes = df_recipes.withColumn("sodium_PDV_per_100_calories", col("sodium_PDV_per_calorie") * 100)
df_recipes = df_recipes.withColumn("protein_PDV_per_100_calories", col("protein_PDV_per_calorie") * 100)
df_recipes = df_recipes.withColumn("saturated_fat_PDV_per_100_calories", col("saturated_fat_PDV_per_calorie") * 100)
df_recipes = df_recipes.withColumn("carbohydrates_PDV_per_100_calories", col("carbohydrates_PDV_per_calorie") * 100)

In [None]:
# Drop unnecessary columns
df_recipes = df_recipes.drop("total_calories",
                             "total_fat_PDV_per_calorie",
                             "sugar_PDV_per_calorie",
                             "sodium_PDV_per_calorie",
                             "protein_PDV_per_calorie",
                             "saturated_fat_PDV_per_calorie",
                             "carbohydrates_PDV_per_calorie")

In [None]:
# Round the standardized nutrient values to 3 decimal places
df_recipes = df_recipes.withColumn("total_fat_PDV_per_100_calories", round(col("total_fat_PDV_per_100_calories"), 3))
df_recipes = df_recipes.withColumn("sugar_PDV_per_100_calories", round(col("sugar_PDV_per_100_calories"), 3))
df_recipes = df_recipes.withColumn("sodium_PDV_per_100_calories", round(col("sodium_PDV_per_100_calories"), 3))
df_recipes = df_recipes.withColumn("protein_PDV_per_100_calories", round(col("protein_PDV_per_100_calories"), 3))
df_recipes = df_recipes.withColumn("saturated_fat_PDV_per_100_calories", round(col("saturated_fat_PDV_per_100_calories"), 3))
df_recipes = df_recipes.withColumn("carbohydrates_PDV_per_100_calories", round(col("carbohydrates_PDV_per_100_calories"), 3))

In [None]:
# Show the DataFrame
df_recipes.show()

+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+------------------------------+--------------------------+---------------------------+----------------------------+----------------------------------+----------------------------------+
|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|calories|total_fat_PDV|sugar_PDV|sodium_PDV|protein_PDV|saturated_fat_PDV|carbohydrates_PDV|total_fat_PDV_per_100_calories|sugar_PDV_per_100_calories|sodium_PDV_per_100_calories|protein_PDV_per_100_calories|saturated_fat_PDV_per_100_calories|carbohydrates_PDV_per_100_calories|
+--------------------+------+-------+--------------+----------+-

# Task 4: Convert Tags to Array
Convert tags column to an array of strings.

In [None]:
# Delete bracket from tags
df_recipes = df_recipes.withColumn("tags", regexp_replace(col("tags"), "\[|\]", ""))

In [None]:
# Create tags_array column by convert tags column
df_recipes = df_recipes.withColumn("tags_array", split(col("tags"), ", "))

# Show the DataFrame
df_recipes.printSchema()

root
 |-- name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- minutes: string (nullable = true)
 |-- contributor_id: string (nullable = true)
 |-- submitted: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: string (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: string (nullable = true)
 |-- calories: string (nullable = true)
 |-- total_fat_PDV: string (nullable = true)
 |-- sugar_PDV: string (nullable = true)
 |-- sodium_PDV: string (nullable = true)
 |-- protein_PDV: string (nullable = true)
 |-- saturated_fat_PDV: string (nullable = true)
 |-- carbohydrates_PDV: string (nullable = true)
 |-- total_fat_PDV_per_100_calories: double (nullable = true)
 |-- sugar_PDV_per_100_calories: double (nullable = true)
 |-- sodium_PDV_per_100_calories: double (nullable = true)
 |-- protein_PDV_per_100_ca

# Task 5: Read and Join Interaction Data

In [None]:
# Import RAW_interactions_cleaned.csv data into google drive directly
file_url = "https://raw-interactions-upgrad.s3.amazonaws.com/RAW_interactions_cleaned.csv"

r = requests.get(file_url, stream = True)

with open("/content/drive/MyDrive/RAW_interactions_cleaned.csv", "wb") as file:
	for block in r.iter_content(chunk_size = 1024):
		if block:
			file.write(block)

In [None]:
# Create dataset from RAW_interactions_cleaned.csv
df_interactions = spark.read.csv("/content/drive/MyDrive/RAW_interactions_cleaned.csv", header=True)

In [None]:
df_interactions.show()

+----------+---------+----------+------+--------------------+
|   user_id|recipe_id|      date|rating|              review|
+----------+---------+----------+------+--------------------+
|     38094|    40893|2003-02-17|     4|Great with a sala...|
|   1293707|    40893|2011-12-21|     5|So simple  so del...|
|      8937|    44394|2002-12-01|     4|This worked very ...|
|    126440|    85009|2010-02-27|     5|I made the Mexica...|
|     57222|    85009|2011-10-01|     5|Made the cheddar ...|
|     52282|   120345|2005-05-21|     4|very very sweet. ...|
|    124416|   120345|2011-08-06|     0|Just an observati...|
|2000192946|   120345|2015-05-10|     2|This recipe was O...|
|     76535|   134728|2005-09-02|     4|          Very good!|
|    273745|   134728|2005-12-22|     5|Better than the r...|
|    353911|   134728|2006-09-26|     5|Absolutely AWESOM...|
|    190375|   134728|2007-03-09|     5|These taste absol...|
|    468945|   134728|2008-02-20|     0|Made my own butte...|
|    255

In [None]:
# Join 2 dataset with 'id' is the key in df_recipes and 'recipe_id' is the key in df_interactions
joined_df = df_recipes.join(df_interactions, df_recipes.id == df_interactions.recipe_id, 'right')

In [None]:
joined_df.dtypes

[('name', 'string'),
 ('id', 'string'),
 ('minutes', 'string'),
 ('contributor_id', 'string'),
 ('submitted', 'string'),
 ('tags', 'string'),
 ('nutrition', 'string'),
 ('n_steps', 'string'),
 ('steps', 'string'),
 ('description', 'string'),
 ('ingredients', 'string'),
 ('n_ingredients', 'string'),
 ('calories', 'string'),
 ('total_fat_PDV', 'string'),
 ('sugar_PDV', 'string'),
 ('sodium_PDV', 'string'),
 ('protein_PDV', 'string'),
 ('saturated_fat_PDV', 'string'),
 ('carbohydrates_PDV', 'string'),
 ('total_fat_PDV_per_100_calories', 'double'),
 ('sugar_PDV_per_100_calories', 'double'),
 ('sodium_PDV_per_100_calories', 'double'),
 ('protein_PDV_per_100_calories', 'double'),
 ('saturated_fat_PDV_per_100_calories', 'double'),
 ('carbohydrates_PDV_per_100_calories', 'double'),
 ('tags_array', 'array<string>'),
 ('user_id', 'string'),
 ('recipe_id', 'string'),
 ('date', 'string'),
 ('rating', 'string'),
 ('review', 'string')]

# Task 6: Create Time-Based Features

In [None]:
# Calculate the time difference in date
joined_df = joined_df.withColumn(
    "time_elapsed_days",
    datediff(col("date"), col("submitted"))
)

In [None]:
joined_df.select("date", "submitted", "time_elapsed_days").show()

+----------+----------+-----------------+
|      date| submitted|time_elapsed_days|
+----------+----------+-----------------+
|2005-04-17|2005-04-12|                5|
|2006-11-25|2006-11-22|                3|
|2007-07-30|2007-01-11|              200|
|2007-09-04|2007-01-11|              236|
|2008-01-12|2007-01-11|              366|
|2008-08-20|2007-01-11|              587|
|2007-06-20|2007-04-29|               52|
|2008-03-02|2007-11-27|               96|
|2011-07-01|2007-11-27|             1312|
|2011-07-18|2007-11-27|             1329|
|2012-10-01|2007-11-27|             1770|
|2015-08-07|2007-11-27|             2810|
|2010-07-02|2008-08-19|              682|
|2014-03-04|2008-10-30|             1951|
|2009-11-29|2009-05-13|              200|
|2002-11-20|2002-10-27|               24|
|2003-01-04|2002-10-27|               69|
|2004-03-14|2002-10-27|              504|
|2005-02-17|2002-10-27|              844|
|2005-12-04|2002-10-27|             1134|
+----------+----------+-----------

# Task 7: Process Numerical Columns

## Introduce non-linearity to numerical columns by converting them into categorical columns

In [None]:
# # Calculate the time difference in year, maybe useful later
joined_df = joined_df.withColumn(
    "years_since_submission_on_review_date",
    round(
        (
            year(col("date")) - year(col("submitted"))
            + (month(col("date")) - month(col("submitted"))) / 12
            + (dayofmonth(col("date")) - dayofmonth(col("submitted"))) / 365
        ),
        2
    )
)

In [None]:
joined_df.dtypes

[('name', 'string'),
 ('id', 'string'),
 ('minutes', 'string'),
 ('contributor_id', 'string'),
 ('submitted', 'string'),
 ('tags', 'string'),
 ('nutrition', 'string'),
 ('n_steps', 'string'),
 ('steps', 'string'),
 ('description', 'string'),
 ('ingredients', 'string'),
 ('n_ingredients', 'string'),
 ('calories', 'string'),
 ('total_fat_PDV', 'string'),
 ('sugar_PDV', 'string'),
 ('sodium_PDV', 'string'),
 ('protein_PDV', 'string'),
 ('saturated_fat_PDV', 'string'),
 ('carbohydrates_PDV', 'string'),
 ('total_fat_PDV_per_100_calories', 'double'),
 ('sugar_PDV_per_100_calories', 'double'),
 ('sodium_PDV_per_100_calories', 'double'),
 ('protein_PDV_per_100_calories', 'double'),
 ('saturated_fat_PDV_per_100_calories', 'double'),
 ('carbohydrates_PDV_per_100_calories', 'double'),
 ('tags_array', 'array<string>'),
 ('user_id', 'string'),
 ('recipe_id', 'string'),
 ('date', 'string'),
 ('rating', 'string'),
 ('review', 'string'),
 ('time_elapsed_days', 'int'),
 ('years_since_submission_on_revi

In [None]:
joined_df.show()

+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+------------------------------+--------------------------+---------------------------+----------------------------+----------------------------------+----------------------------------+--------------------+----------+---------+----------+------+--------------------+-----------------+-------------------------------------+
|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|calories|total_fat_PDV|sugar_PDV|sodium_PDV|protein_PDV|saturated_fat_PDV|carbohydrates_PDV|total_fat_PDV_per_100_calories|sugar_PDV_per_100_calories|sodium_PDV_per_100_calories|protein_PDV_per_100_calorie

In [None]:
joined_df = joined_df.withColumn("minutes", col("minutes").cast("integer")) \
                     .withColumn("calories", col("calories").cast("integer")) \
                     .withColumn("total_fat_PDV", col("total_fat_PDV").cast("integer")) \
                     .withColumn("sugar_PDV", col("sugar_PDV").cast("integer")) \
                     .withColumn("sodium_PDV", col("sodium_PDV").cast("integer")) \
                     .withColumn("protein_PDV", col("protein_PDV").cast("integer")) \
                     .withColumn("saturated_fat_PDV", col("saturated_fat_PDV").cast("integer")) \
                     .withColumn("carbohydrates_PDV", col("carbohydrates_PDV").cast("integer"))

In [None]:
numerical_cols = [
    "years_since_submission_on_review_date",
    "minutes",
    "calories",
    "total_fat_PDV",
    "sugar_PDV",
    "sodium_PDV",
    "protein_PDV",
    "saturated_fat_PDV",
    "carbohydrates_PDV"
]

In [None]:
# Create QuantileDiscretizer instances with 5 buckets
from pyspark.ml.feature import QuantileDiscretizer
discretizers = []
for col in numerical_cols:
    discretizer = QuantileDiscretizer(numBuckets=5, inputCol=col, outputCol=f"{col}_binned")
    discretizers.append(discretizer)

In [None]:
# Create a Pipeline
from pyspark.ml.pipeline import Pipeline
pipeline = Pipeline(stages=discretizers)

In [None]:
# Fit and Transform the DataFrame
model = pipeline.fit(joined_df)
binned_df = model.transform(joined_df)

In [None]:
binned_df.show()

+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+------------------------------+--------------------------+---------------------------+----------------------------+----------------------------------+----------------------------------+--------------------+-------+---------+----------+------+--------------------+-----------------+-------------------------------------+--------------------------------------------+--------------+---------------+--------------------+----------------+-----------------+------------------+------------------------+------------------------+
|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|calorie

In [None]:
# Select the original and binned columns
selected_cols = [col for col in joined_df.columns] + [f"{col}_binned" for col in numerical_cols]
final_df = binned_df.select(selected_cols)

# Task 8: Create User-Level Features

In [None]:
from pyspark.sql import Window

In [None]:
windowSpec = Window.partitionBy("user_id")

In [None]:
# Create features that capture user preferences and rating tendencies:
user_avg_rating= joined_df.withColumn("user_avg_rating", avg("rating").over(windowSpec)) \
                          .withColumn("user_avg_n_ratings", count("*").over(windowSpec)) \
                          .withColumn("user_avg_years_betwn_review_and_submission", avg("years_since_submission_on_review_date").over(windowSpec)) \
                          .withColumn("user_avg_prep_time_recipes_reviewed", avg("minutes").over(windowSpec)) \
                          .withColumn("user_avg_n_steps_recipes_reviewed", avg("n_steps").over(windowSpec)) \
                          .withColumn("user_avg_n_ingredients_recipes_reviewed", avg("n_ingredients").over(windowSpec)) \
                          .withColumn("user_avg_calories_recipes_reviewed", avg("calories").over(windowSpec)) \
                          .withColumn("user_avg_total_fat_per_100_cal_recipes_reviewed", avg("total_fat_PDV_per_100_calories").over(windowSpec)) \
                          .withColumn("user_avg_sugar_per_100_cal_recipes_reviewed", avg("sugar_PDV_per_100_calories").over(windowSpec)) \
                          .withColumn("user_avg_sodium_per_100_cal_recipes_reviewed", avg("sodium_PDV_per_100_calories").over(windowSpec)) \
                          .withColumn("user_avg_protein_per_100_cal_recipes_reviewed", avg("protein_PDV_per_100_calories").over(windowSpec)) \
                          .withColumn("user_avg_saturated_fat_per_100_cal_recipes_reviewed", avg("saturated_fat_PDV_per_100_calories").over(windowSpec)) \
                          .withColumn("user_avg_carbohydrates_per_100_cal_recipes_reviewed", avg("carbohydrates_PDV_per_100_calories").over(windowSpec))


In [None]:
user_avg_rating.show()

+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+------------------------------+--------------------------+---------------------------+----------------------------+----------------------------------+----------------------------------+--------------------+-------+---------+----------+------+--------------------+-----------------+-------------------------------------+---------------+------------------+------------------------------------------+-----------------------------------+---------------------------------+---------------------------------------+----------------------------------+-----------------------------------------------+-------------------------------------------+--------------------------------------------+---------------------------------------

In [None]:
from pyspark.sql.functions import col
# Filter for high-rated reviews
joined_df = joined_df.withColumn("rating", col("rating").cast("integer"))

In [None]:
# Calculate statistics for high-rated recipes
high_rated_df = joined_df.filter(col("rating") == 5)
high_rated_windowSpec = Window.partitionBy("user_id")

high_rated_df = high_rated_df.withColumn("user_avg_years_betwn_review_and_submission_high_ratings", avg("years_since_submission_on_review_date").over(high_rated_windowSpec)) \
                             .withColumn("user_avg_prep_time_recipes_reviewed_high_ratings", avg("minutes").over(high_rated_windowSpec)) \
                             .withColumn("user_avg_n_steps_recipes_reviewed_high_ratings", avg("n_steps").over(high_rated_windowSpec)) \
                             .withColumn("user_avg_n_ingredients_recipes_reviewed_high_ratings", avg("n_ingredients").over(high_rated_windowSpec))

In [None]:
high_rated_df.select(
    "user_id",
    "user_avg_years_betwn_review_and_submission_high_ratings",
    "years_since_submission_on_review_date",
    "user_avg_prep_time_recipes_reviewed_high_ratings",
    "user_avg_n_steps_recipes_reviewed_high_ratings",
    "user_avg_n_ingredients_recipes_reviewed_high_ratings"
).distinct("user_id").show()

TypeError: DataFrame.distinct() takes 1 positional argument but 2 were given

In [None]:
# Join the high-rated user averages back to the original DataFrame
joined_df = joined_df.join(
    high_rated_df.select("user_id", "user_avg_years_betwn_review_and_submission_high_ratings",
                         "years_since_submission_on_review_date",
                         "user_avg_prep_time_recipes_reviewed_high_ratings",
                         "user_avg_n_steps_recipes_reviewed_high_ratings",
                         "user_avg_n_ingredients_recipes_reviewed_high_ratings"),
    on="user_id",
    how="left"
)

# Task 9: Create Tag-Level Features


In [None]:
total_tags_array_count = final_df.select("id","tags_array").count()

In [None]:
print(f'Total tags array count is: {total_tags_array_count}')

Total tags array count is: 1132367


In [None]:
final_df.select("id","tags_array").show()

+------+--------------------+
|    id|          tags_array|
+------+--------------------+
|116422|['time-to-make', ...|
|197160|['30-minutes-or-l...|
|204836|['60-minutes-or-l...|
|204836|['60-minutes-or-l...|
|204836|['60-minutes-or-l...|
|204836|['60-minutes-or-l...|
|225241|['60-minutes-or-l...|
|268290|['time-to-make', ...|
|268290|['time-to-make', ...|
|268290|['time-to-make', ...|
|268290|['time-to-make', ...|
|268290|['time-to-make', ...|
|320366|['30-minutes-or-l...|
|334071|['time-to-make', ...|
|371831|['60-minutes-or-l...|
| 44289|['60-minutes-or-l...|
| 44289|['60-minutes-or-l...|
| 44289|['60-minutes-or-l...|
| 44289|['60-minutes-or-l...|
| 44289|['60-minutes-or-l...|
+------+--------------------+
only showing top 20 rows



In [None]:
# Clean and tokenize tags
joined_df = joined_df.withColumn("tag", explode("tags_array"))

In [None]:
joined_df.select("id","tag").show()

+-----+--------------------+
|   id|                 tag|
+-----+--------------------+
|39087|'30-minutes-or-less'|
|39087|      'time-to-make'|
|39087|            'course'|
|39087|   'main-ingredient'|
|39087|           'cuisine'|
|39087|       'preparation'|
|39087|          'occasion'|
|39087|    'north-american'|
|39087|        'for-1-or-2'|
|39087|         'main-dish'|
|39087|        'eggs-dairy'|
|39087|             'pasta'|
|39087|           'poultry'|
|39087|          'american'|
|39087|             'cajun'|
|39087|'southern-united-...|
|39087|              'easy'|
|39087|      'dinner-party'|
|39087|      'kid-friendly'|
|39087|          'romantic'|
+-----+--------------------+
only showing top 20 rows



In [None]:
# Example ID 39087
id_count_example = joined_df.filter(col("id") == 39087).count()
print(f'Count for ID 39087: {id_count_example}')

Count for ID 39087: 2224994


In [None]:
count_id_tag = final_df.filter(col("id") == 39087).count()
print(f'Count array for ID 39087: {count_id_tag}')

Count array for ID 39087: 1448


In [None]:
joined_df.select("tag").distinct().show()

+--------------------+
|                 tag|
+--------------------+
|   'pressure-cooker'|
|'jams-and-preserves'|
|    'elbow-macaroni'|
|     'peanut-butter'|
|'herb-and-spice-m...|
|         'meatballs'|
|              'fish'|
|    'veggie-burgers'|
|            'cheese'|
|           'carrots'|
|     'rosh-hashanah'|
|           'oysters'|
|        'casseroles'|
|         'cantonese'|
|             'quail'|
|         'artichoke'|
|    'middle-eastern'|
|           'italian'|
|            'spring'|
|  'chinese-new-year'|
+--------------------+
only showing top 20 rows



In [None]:
tag_counts = joined_df.groupBy("tag").count().orderBy("count", ascending=False)

In [None]:
top_5_frequent_tags = tag_counts.filter(col("count") > tag_counts.approxQuantile("count", [0.95], 0.01)[0])

In [None]:
total_tag_count = tag_counts.select(sum("count")).collect()[0][0]

In [None]:
top_5_frequent_tags.show()

In [None]:
# Rating correlation
tag_ratings = joined_df.groupBy("tag").agg(avg("rating").alias("avg_rating"))
top_5_rated_tags = tag_ratings.orderBy("avg_rating", ascending=False).limit(int(0.05 * tag_ratings.count()))
bottom_5_rated_tags = tag_ratings.orderBy("avg_rating").limit(int(0.05 * tag_ratings.count()))

In [None]:
print(f'Total tag count is: {total_tag_count}')

In [None]:
distinct_id = joined_df.select(countDistinct("id")).collect()[0][0]

In [None]:
tag_count_per_id = int(total_tag_count) / int(distinct_id)

In [None]:
print(f'Tag count per ID is: {tag_count_per_id}')

In [None]:
final_df = final_df.groupBy("rating").count()

In [None]:
top_5_rated_tags.show()

In [None]:
bottom_5_rated_tags.show()