In [0]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import Bucketizer
# Import module
from pyspark.sql.functions import *
from pyspark.sql.functions import col, split, regexp_extract
# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType
from pyspark.sql.types import ArrayType
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np


In [0]:
# Create SparkSession
spark = SparkSession.builder.appName("RecipeRecommender").getOrCreate()

### **Task 1**: Read the Data
- Read RAW_recipes_cleaned.csv from the provided link.
- Ensure all fields have the correct data types.

In [0]:
#Task 1: Read the Data
Raw_Recipe_Data = (spark.read.csv("dbfs:/FileStore/RAW_recipes_cleaned.csv", header=True,inferSchema=True))

In [0]:
#Show schema
Raw_Recipe_Data.printSchema()

In [0]:
#Show the first 5 rows of data
# Raw_Recipe_Data.toPandas().head(5)

### **Task 2**: Extract Nutrition Features
The nutrition column is currently read as a string. Extract the seven individual nutrition values from each row and create separate columns:
- calories
- total_fat_PDV
- sugar_PDV
- sodium_PDV
- protein_PDV
- saturated_fat_PDV
- carbohydrates_PDV

In [0]:
# String operations to remove square brakets
Raw_Recipe_Data.select('nutrition').show(5)

In [0]:
# List of nutrition columns

Nutrition_Col_Names = ['calories',
                          'total_fat_PDV',
                          'sugar_PDV',
                          'sodium_PDV',
                          'protein_PDV',
                          'saturated_fat_PDV',
                          'carbohydrates_PDV']

In [0]:
# STEP 2.1 string operations to remove square brakets in 'nutrition' column
Raw_Recipe_Data = (Raw_Recipe_Data
                  .withColumn('nutrition',(F.regexp_replace("nutrition","[\[\]]",""))))

In [0]:
# Task 02 Cell 2 out of 3
# STEP 2.2 - split the neutrition string into seven individial values. 
# Create an object to split the nutrition column

import pyspark

Nutrition_Col_Split = pyspark.sql.functions.split(Raw_Recipe_Data['nutrition'],',')
for col_index, col_name in enumerate(Nutrition_Col_Names):
    Raw_Recipe_Data = (Raw_Recipe_Data.withColumn(col_name, Nutrition_Col_Split.getItem(col_index).cast("float")))

### Test cases for task 02

In [0]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert Raw_Recipe_Data.schema["carbohydrates_PDV"].dataType == FloatType(), "Recheck your typecasting"
assert Raw_Recipe_Data.collect()[123432][14] == 62.0, "The columns have not been split correctly."
assert Raw_Recipe_Data.collect()[10000][12] == 60.400001525878906, "The columns have not been split correctly."

### Task 3: Standardize Nutrition Values
Standardize the nutrition values to a per 100 calorie basis to account for serving size variations.

In [0]:
# Task 03 Cell 1 out of 1

for nutrition_col in Nutrition_Col_Names:# loop over each of the newly created nutrition columns 
    if nutrition_col != "calories":
        # the calories column should not be a part of the transformation exercise
        # following code will name the new columns 
        nutrition_per_100_cal_col = (nutrition_col
                                 .replace('_PDV','')
                                 +'_per_100_cal')
        Raw_Recipe_Data = Raw_Recipe_Data.withColumn(nutrition_per_100_cal_col,
                                               Raw_Recipe_Data[nutrition_col]*100/Raw_Recipe_Data["calories"]
                                                # pyspark code to recreate the intended transformation 
                                                  )
        
        # You might end up adding nulls to the data because of our intended transformation. 
        # Perform a fill na operation to fill all the nulls with 0s. 
        # You must limit the scope of the fill na to the current column only. 
        
        Raw_Recipe_Data = Raw_Recipe_Data.fillna(value=0,subset=[nutrition_per_100_cal_col]) 
        # pyspark code to fill nulls with 0 in only the current nutrition_per_100_cal_col            

In [0]:
# Raw_Recipe_Data.toPandas().head(3)

### Task 4: Convert Tags to Array
The tags column is currently read as a string. Convert it to an array of strings.

In [0]:
# Task 04 Cell 1 out of 1
Raw_Recipe_Data = (Raw_Recipe_Data
                  .withColumn('tags', F.regexp_replace("tags","[\\[\\]\\']","")
                             )
                  .withColumn('tags', F.split("tags",", ")
                             )
                 )

In [0]:
Raw_Recipe_Data.printSchema()

In [0]:
Raw_Recipe_Data.schema["tags"].dataType == ArrayType(StringType(), True)

In [0]:
# Raw_Recipe_Data.toPandas().head(3)

### Task 5: Read and Join Interaction Data
- Read RAW_interactions_cleaned.csv from the provided link.
- Join this interaction data with the recipe data using the appropriate key. The resulting dataframe should contain all interactions with corresponding recipe information.

In [0]:
Raw_Ratings_Data = (spark.read.csv('dbfs:/FileStore/RAW_interactions_cleaned.csv', 
                                 header=True, 
                                 inferSchema= True)
                  .withColumn("review_date",  F.col("date"))
                  .drop(F.col("date"))
                  )

In [0]:
Raw_Ratings_Data.printSchema()

In [0]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert Raw_Ratings_Data.count() == 1132367, "There is a mistake in reading the data."
assert len(Raw_Ratings_Data.columns) == 5, "There is a mistake in reading the data."

In [0]:
# Raw_Ratings_Data.toPandas().head(5)

In [0]:
# Task 05 Cell 1 out of 1
interaction_level_df = Raw_Ratings_Data.join(Raw_Recipe_Data,Raw_Ratings_Data.recipe_id == Raw_Recipe_Data.id,"inner")
                                           # add the key on which the join should happen
                                           # mention the type of join expected.

### Test cases for Task 05

In [0]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert (interaction_level_df.count() ,len(interaction_level_df.columns)) == (1132367, 30), "The type of join is incorrect"

lst1 = Raw_Ratings_Data.select('recipe_id').collect()
lst2 = Raw_Recipe_Data.select('id').collect()
exclusive_set = set(lst1)-set(lst2)

assert len(exclusive_set) == 0, "There is a mistake in reading one of the two data files."

### Task 6: Create Time-Based Features
Create features that capture the time elapsed between a review date and the recipe's submission date. Use the date (from interactions data) and submitted columns.

In [0]:
interaction_level_df = (interaction_level_df
                        .withColumn('days_since_submission_on_review_date',F.datediff("review_date","submitted")
                                     # Pyspark function to find the number of days between two dates              
                                   )
                        .withColumn('months_since_submission_on_review_date',F.months_between("review_date","submitted")
                                     # Pyspark function to find the number of months between two dates          
                                   )
                        .withColumn('years_since_submission_on_review_date',F.months_between("review_date","submitted")/12
                                     # Pyspark function to find the number of months between two dates / 12          
                                   )
                         )

### Test cases for Task 06

In [0]:

assert interaction_level_df.schema["days_since_submission_on_review_date"].dataType == IntegerType()

assert (interaction_level_df.filter((interaction_level_df.user_id == 428885) & (interaction_level_df.recipe_id == 335241))
                            .select('days_since_submission_on_review_date').collect()[0][0]) == 77
assert (interaction_level_df.filter((interaction_level_df.user_id == 2025676) & (interaction_level_df.recipe_id == 94265))
                            .select('months_since_submission_on_review_date').collect()[0][0]) == 153.22580645
assert (interaction_level_df.filter((interaction_level_df.user_id == 338588) & (interaction_level_df.recipe_id == 21859))
                            .select('years_since_submission_on_review_date').collect()[0][0]) == 4.564516129166667

In [0]:
interaction_level_df.printSchema()

In [0]:
# interaction_level_df.toPandas().head(5)

In [0]:
# Save the data we have created so far in a parquet file
interaction_level_df.printSchema()

In [0]:
assert (interaction_level_df.count() ,len(interaction_level_df.columns) ) == (1132367, 33)
(interaction_level_df.count() ,len(interaction_level_df.columns) ) == (1132367, 33)


In [0]:
## Write the raw_recipes_df
## create a folder named data in you current directry before running this. 

from pyspark.sql import SparkSession
interaction_level_df.write.parquet("dbfs:/FileStore/interaction_level_df") # Modify the path as you need

In [0]:
interaction_level_df.show(5)


In [0]:
# Initial Setup
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [0]:
# Run this everytime you create a new spark instance. 

# Use %pip to install the required libraries
%pip install plotly==5.5.0
%pip install pandas==0.25.1
%pip install numpy==1.14.5
%pip install matplotlib==3.1.1



In [0]:
from pyspark.sql import functions as F
from pyspark.ml.feature import Bucketizer

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType
from pyspark.sql.types import ArrayType

In [0]:
# Defining Custom Functions
def get_quantiles(df, col_name, quantiles_list = [0.01, 0.25, 0.5, 0.75, 0.99]):
    """
    Takes a numerical column and returns column values at requested quantiles

    Inputs 
    Argument 1: Dataframe
    Argument 2: Name of the column
    Argument 3: A list of quantiles you want to find. Default value [0.01, 0.25, 0.5, 0.75, 0.99]

    Output 
    Returns a dictionary with quantiles as keys and column quantile values as values 
    """
    # Get min, max and quantile values for given column
    min_val = df.agg(F.min(col_name)).first()[0]
    max_val = df.agg(F.max(col_name)).first()[0]
    quantiles_vals = df.approxQuantile(col_name,
                                       quantiles_list,
                                       0)
  
    # Store min, quantiles and max in output dict, sequentially
    quantiles_dict = {0.0:min_val}
    quantiles_dict.update(dict(zip(quantiles_list, quantiles_vals)))
    quantiles_dict.update({1.0:max_val})
    return(quantiles_dict)

In [0]:
def plot_bucketwise_statistics(summary, bucketizer):
    """
    Takes in a dataframe and a bucketizer object and plots the summary statistics for each bucket in the dataframe. 
  
    Inputs
    Argument 1: Pandas dataframe obtained from bucket_col_print_summary function 
    Argument 2: Bucketizer object obtained from bucket_col_print_summary function
  
    Output
    Displays a plot of bucketwise average ratings and number of ratings for a parameter.   
    """
    # Creating bucket labels from splits
    classlist = bucketizer.getSplits()
    number_of_classes = len(classlist) - 1

    class_labels = []
    hover_labels = []
    for i in range(number_of_classes):
        hover_labels.append(str(classlist[i]) + "-" + str(classlist[i+1]) + " (Bucket name: " + str(int(i)) + ")")
        class_labels.append(str(classlist[i]) + "-" + str(classlist[i+1]))
  
    summary["Scaled_number"] = (summary["n_ratings"] - summary["n_ratings"].min()) / (summary["n_ratings"].max() - summary["n_ratings"].min()) + 1.5
    summary['Bucket_Names'] = class_labels
  
    # Making the plot
    x = summary["Bucket_Names"]
    y1 = summary["avg_rating"]
    y2 = summary["n_ratings"]
    err = summary["stddev_rating"]  

    # Setting up the figure
    plt.rcParams["figure.figsize"] = [summary.shape[0] + 2, 6.0]
    plt.rcParams["figure.autolayout"] = True
    fig, ax1 = plt.subplots()

    # Bars with dark blue color
    bar = ax1.bar(x, y1, color="#00008B")
    ax1.errorbar(x, y1, yerr=err, fmt="o", color="#FFD700")  # Error bars in dark yellow
    ax1.set(ylim=(0, 7))
  
    # Add labels above the bars
    def barlabel(x_list, y_list):
        for i in range(len(x_list)):
            ax1.text(i, y_list[i] + 0.2, y_list[i], ha='center',
                     fontdict=dict(size=10),
                     bbox=dict(facecolor='#00008B', alpha=0.2))
    barlabel(summary["Bucket_Names"].tolist(), summary["avg_rating"].round(2).tolist())
  
    # Scatter plot with dark yellow color
    ax2 = ax1.twinx()
    ax2.scatter(x, y2, s=summary["Scaled_number"] * 500, c="#FFD700")  
    ax2.set(ylim=(0, summary["n_ratings"].max() * 1.15))

    # Add labels above the scatter points
    def scatterlabel(x_list, y_list):
        for i in range(len(x_list)):
            ax2.text(i, y_list[i] + 15000, y_list[i], ha='center',
                     fontdict=dict(size=10),
                     bbox=dict(facecolor='#FFD700', alpha=0.5))
    scatterlabel(summary["Bucket_Names"].tolist(), summary["n_ratings"].tolist())
  
    # Giving labels to the axes
    ax1.set_xlabel(bucketizer.getOutputCol(), fontdict=dict(size=14)) 
    ax1.set_ylabel("Average Ratings", fontdict=dict(size=14))
  
    # Secondary y-axis label
    ax2.set_ylabel('Number of Ratings', fontdict=dict(size=14))
  
    # Plot title
    plt.title('Average ratings for each bucket and the quantity of ratings for \n' + bucketizer.getInputCol(), 
              fontdict=dict(size=14))


In [0]:
def bucket_col_print_summary(df, splits, inputCol, outputCol):
    """
    Given a numerical column in a data frame, adds a bucketized version of the column to the data frame, according to splits provided.
    Also prints a summary of ratings seen in each bucket made.

    Inputs 
    Argument 1: Data Frame 
    Argument 2: Values at which the column will be split
    Argument 3: Name of the input column (numerical column)
    Argument 4: Name of the output column (bucketized numerical column)

    Output: 
    1) New dataframe with the output column added
    2) Bucketizer object trained from the input column 
    3) Pandas dataframe with summary statistics for ratings seen in buckets of the output column
    Also plots summary statistics for ratings seen in buckets of the output column
    """

    # Dropping bucket if it already exists
    if outputCol in df.columns:
        df = df.drop(outputCol)

    # Training bucketizer
    bucketizer = Bucketizer(splits = splits,
                            inputCol  = inputCol,
                            outputCol = outputCol)
    
    df = bucketizer.setHandleInvalid("keep").transform(df)

    # Printing meta information on buckets created
    print("Added bucketized column {}".format(outputCol))
    print("")
    print("Bucketing done for split definition: {}".format(splits))
    print("")  
    print("Printing summary statistics for ratings in buckets below:")

    # Creating a summary statistics dataframe and passing it to the plotting function
    summary =  (df
                .groupBy(outputCol)
                .agg(F.avg('rating').alias('avg_rating'),
                     F.stddev('rating').alias('stddev_rating'),
                     F.count('rating').alias('n_ratings'))
                .sort(outputCol)
                .toPandas())
  
    plot_bucketwise_statistics(summary,bucketizer)
  
    return df, bucketizer, summary

In [0]:
def get_column_distribution_summary(df, col_name):
    """
    Takes a column in a data frame and prints the summary statistics (average, standard deviation, count and distinct count) for all unique values in that column.
  
    Inputs 
    Argument 1: Dataframe 
    Argument 2: Name of the column
  
    Output
    Returns nothing 
    Prints a Dataframe with summary statistics
    """
    print(df
          .groupBy(col_name)
          .agg(F.avg('rating').alias('avg_rating'),
               F.stddev('rating').alias('stddev_rating'),
               F.count('rating').alias('n_ratings'),
               F.countDistinct('id').alias('n_recipes'))
          .sort(F.col(col_name).asc())
          .show(50))

In [0]:
def get_n_items_satisfying_condition (df, condition, aggregation_level = "recipe"):
    """
    Given a condition, find the number of recipes / reviews that match the condition.
    Also calculates the percentage of such recipes / reviews as a percentage of all recipes / reviews.
  
    Inputs 
    Argument 1: Dataframe 
    Argument 2: Logical expression describing a condition, string type. eg: "minutes == 0"
    Argument 3: Aggregation level for determining "items", either  "recipe" or "review". Default value == "recipe"
  
    Output: Returns no object.
    Prints the following:
    1) Number of recipes / reviews that satisfy the condition
    2) Total number of recipes / reviews in the dataframe
    3) Percentage of recipes / reviews that satisfy the condition
    """
    # Find out num rows satisfying the condition
    if aggregation_level == "recipe": 
        number_of_rows_satisfying_condition = (df
                                             .filter(condition)
                                             .agg(F.countDistinct("id"))).first()[0]
      
        n_rows_total = (df.agg(F.countDistinct("id"))).first()[0]
    if aggregation_level == "review":
        number_of_rows_satisfying_condition = (df
                                             .filter(condition)
                                             .agg(F.countDistinct("id","user_id"))).first()[0]
        n_rows_total = (df.agg(F.countDistinct("id","user_id"))).first()[0]
  
    # Find out % rows satisfying the conditon and print a properly formatted output
    perc_rows = round(number_of_rows_satisfying_condition * 100/ n_rows_total, 2)
    print('Condition String                   : "{}"'.format(condition))
    print("Num {}s Satisfying Condition   : {} [{}%]".format(aggregation_level.title(), number_of_rows_satisfying_condition, perc_rows))
    print("Total Num {}s                  : {}".format(aggregation_level.title(), n_rows_total))

In [0]:
# Read the data
# Read interaction_level_df_processed

In [0]:
interaction_level_df_processed = spark.read.parquet("dbfs:/FileStore/interaction_level_df")


In [0]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert interaction_level_df.count() == 1132367, "There is a mistake in reading the data."
assert len(interaction_level_df.columns) == 33, "There is a mistake in reading the data."

In [0]:
interaction_level_df.show(5)


In [0]:
interaction_level_df.toPandas().head(5)


### Task 7: Process Numerical Columns (Optional)
- Introduce non-linearity to numerical columns by converting them into categorical columns using binning (percentile-based bucketing).
- Columns to consider:
> - years_since_submission_on_review_date
> - minutes
> - calories
> - total_fat_PDV
> - sugar_PDV
> - sodium_PDV
> - protein_PDV
> - saturated_fat_PDV
> - carbohydrates_PDV
- Analyze the average rating for each bucket to determine the usefulness of the bucketed column.
- Handle any data inconsistencies or edge cases (e.g., reviews before submission date) appropriately.

In [0]:
# Bucketing and Cleaning Numerical Features

# 1. years_since_submission_on_review_date

# [Review Time Since Submission]

# Recipes more than 6 years old are rated low

In [0]:
get_quantiles(df = interaction_level_df,
             col_name = "years_since_submission_on_review_date")

In [0]:

get_n_items_satisfying_condition(df = interaction_level_df,
                                 condition= 'years_since_submission_on_review_date < 0',
                                 aggregation_level= "review")

In [0]:
# Only keep interactions with review dates >= recipe submission date

interaction_level_df = (interaction_level_df
                        .filter('years_since_submission_on_review_date >= 0'))

In [0]:
splits = [ 0, 1, 3, 6, float('Inf')]
inputCol  = "years_since_submission_on_review_date"
outputCol = "years_since_submission_on_review_date_bucket"

(interaction_level_df, submission_time_bucketizer, submission_time_pandas_df) = bucket_col_print_summary(df = interaction_level_df,
                                                                              splits = splits,
                                                                              inputCol  = inputCol,
                                                                              outputCol = outputCol)

In [0]:
%matplot plt


In [0]:
# 2. minutes
# [prep time]

# Somewhat relevant
# Low prep time is preferred

In [0]:
get_quantiles(df = interaction_level_df,
              col_name = "minutes",
              quantiles_list=[0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99])

In [0]:
# Capping prep time at 930 minutes

interaction_level_df = (interaction_level_df
                        .withColumn("minutes",
                                    F.when(interaction_level_df["minutes"] > 930, 930)
                                     .otherwise(interaction_level_df["minutes"])))

In [0]:
# let's look at some examples with 1 step only to see if this makes sense

interaction_level_df.filter('minutes == 0 and n_steps == 1').show(5)

In [0]:
get_n_items_satisfying_condition(df = interaction_level_df,
                                 condition = 'minutes == 0',
                                 aggregation_level = "recipe")

In [0]:
# Remove recipes with cook time zero

interaction_level_df = interaction_level_df.filter("minutes > 0")

In [0]:
get_n_items_satisfying_condition(df = interaction_level_df,
                                 condition = 'minutes == 0',
                                 aggregation_level = "recipe")

In [0]:
splits = [0, 15, 30, 60, float('Inf')]
inputCol  = "minutes"
outputCol = "prep_time_bucket"

(interaction_level_df, prep_time_bucketizer, prep_time_summary_pandas_df) = bucket_col_print_summary(df = interaction_level_df,
                                                                              splits = splits,
                                                                              inputCol  = inputCol,
                                                                              outputCol = outputCol)

In [0]:
%matplot plt


In [0]:
# 3. n_steps
# Clearly relevant

# Recipes with less than 2 steps are rated high

# Recipes with more than 29 steps are rated very low

In [0]:
get_quantiles(df = interaction_level_df,
              col_name = "n_steps")

In [0]:
interaction_level_df.filter('n_steps == 0').show(5, truncate = False)

In [0]:
get_n_items_satisfying_condition(df = interaction_level_df,
                                 condition = 'n_steps == 0',
                                 aggregation_level = "recipe")

In [0]:
# Remove recipes with n_steps zero

interaction_level_df = interaction_level_df.filter("n_steps > 0")

In [0]:
splits = [0, 2, 6, 8, 12, 29, float('Inf')]
inputCol  = "n_steps"
outputCol = "n_steps_bucket"

(interaction_level_df, n_steps_bucketizer, n_steps_pandas_df) = bucket_col_print_summary(df = interaction_level_df,
                                                                              splits = splits,
                                                                              inputCol  = inputCol,
                                                                              outputCol = outputCol)

In [0]:
%matplot plt


In [0]:
# 4. n_ingredients

# Not relevant



In [0]:
get_quantiles(df = interaction_level_df,
              col_name = "n_ingredients")

In [0]:
splits = [0, 6, 9, 11, float('Inf')]
inputCol  = "n_ingredients"
outputCol = "n_ingredients_bucket"

(interaction_level_df, n_ingredients_bucketizer, n_ingredients_pandas_df) = bucket_col_print_summary(df = interaction_level_df,
                                                                              splits = splits,
                                                                              inputCol  = inputCol,
                                                                              outputCol = outputCol)

In [0]:
%matplot plt


In [0]:
# 5. nutrition columns
# calories - Calories per serving seems irrelevant
# fat (per 100 cal) - Calories per serving seems irrelevant
# sugar (per 100 cal) - Calories per serving seems irrelevant
# sodium (per 100 cal) - Calories per serving seems irrelevant
# protein (per 100 cal) - Calories per serving seems irrelevant
# sat. fat (per 100 cal) - Calories per serving seems irrelevant
# carbs (per 100 cal) - Calories per serving seems irrelevant

In [0]:
interaction_level_df.columns 


In [0]:
nutrition_cols = ['calories', 
                  'total_fat_PDV', 
                  'sugar_PDV', 
                  'sodium_PDV', 
                  'protein_PDV', 
                  'saturated_fat_PDV', 
                  'carbohydrates_PDV', 
                  'total_fat_per_100_cal', 
                  'sugar_per_100_cal', 
                  'sodium_per_100_cal', 
                  'protein_per_100_cal', 
                  'saturated_fat_per_100_cal', 
                  'carbohydrates_per_100_cal']

quantiles_list = [0.00, 0.05, 0.25, 0.5, 0.75, 0.95, 1.00]
nutrition_col_quantiles = pd.DataFrame(index = quantiles_list)

In [0]:
for col in nutrition_cols:
    nutrition_col_quantiles[col] = (get_quantiles(df = interaction_level_df,
                                                col_name = col,
                                                quantiles_list=quantiles_list)
                                  .values())

In [0]:
nutrition_col_quantile_summary = pd.DataFrame(index = ["0.00-0.25", "0.25-0.50", "0.50-0.75", "0.75-0.95", "0.95 - 1.00"])

for col in nutrition_cols:
    splits = ([0]
            + list(nutrition_col_quantiles.loc[[0.25, 0.5, 0.75, 0.95], col].round())
            + [float('Inf')])
    inputCol  = col
    outputCol = col+"_bucket"

    if outputCol in interaction_level_df.columns:
        interaction_level_df = interaction_level_df.drop(outputCol)

  # Training bucketizer
    bucketizer = Bucketizer(splits = splits,
                          inputCol  = inputCol,
                          outputCol = outputCol)
  
    interaction_level_df = bucketizer.setHandleInvalid("keep").transform(interaction_level_df)
  
    nutrition_col_quantile_summary.loc[:, col] = (interaction_level_df
                                                .groupBy(outputCol)
                                                .agg(F.avg('rating').alias('avg_rating'))
                                                .sort(outputCol)
                                                .select('avg_rating').toPandas().values)

In [0]:
# set the max columns to none
pd.set_option('display.max_columns', None)

In [0]:
nutrition_col_quantile_summary


### Task 8: Create User-Level Features (Optional)
- Create features that capture user preferences and rating tendencies:
> - user_avg_rating
> - user_avg_n_ratings
> - user_avg_years_betwn_review_and_submission
> - user_avg_prep_time_recipes_reviewed
> - user_avg_n_steps_recipes_reviewed
> - user_avg_n_ingredients_recipes_reviewed
> - user_avg_years_betwn_review_and_submission_high_ratings
> - user_avg_calories_recipes_reviewed
> - user_avg_total_fat_per_100_cal_recipes_reviewed
> - user_avg_sugar_per_100_cal_recipes_reviewed
> - user_avg_sodium_per_100_cal_recipes_reviewed
> - user_avg_protein_per_100_cal_recipes_reviewed
> - user_avg_saturated_fat_per_100_cal_recipes_reviewed
> - user_avg_carbohydrates_per_100_cal_recipes_reviewed
> - user_avg_prep_time_recipes_reviewed_high_ratings
> - user_avg_n_steps_recipes_reviewed_high_ratings
> - user_avg_n_ingredients_recipes_reviewed_high_ratings
- "High ratings" refers to reviews with a rating of 5.
- Check for and handle any null values introduced during feature creation.
- Consider bucketing these user-level features.


In [0]:
# 03_FEATURE_EXTRACTION_PART

# Initial Setup (2)


In [0]:
from pyspark.sql import SparkSession


In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [0]:
# Run this everytime you create a new spark instance. 

spark.sparkContext.install_pypi_package("plotly==5.5.0")
spark.sparkContext.install_pypi_package("pandas==0.25.1")
spark.sparkContext.install_pypi_package("numpy==1.14.5")
spark.sparkContext.install_pypi_package("matplotlib==3.1.1")

In [0]:
from pyspark.sql import functions as F
from pyspark.ml.feature import Bucketizer

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

from pyspark.sql.window import Window

# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType
from pyspark.sql.types import ArrayType

In [0]:
# Defining Custom Functions (2)

In [0]:
def get_quantiles(df, col_name, quantiles_list = [0.01, 0.25, 0.5, 0.75, 0.99]):
    """
    Takes a numerical column and returns column values at requested quantiles

    Inputs 
    Argument 1: Dataframe
    Argument 2: Name of the column
    Argument 3: A list of quantiles you want to find. Default value [0.01, 0.25, 0.5, 0.75, 0.99]

    Output 
    Returns a dictionary with quantiles as keys and column quantile values as values 
    """
    # Get min, max and quantile values for given column
    min_val = df.agg(F.min(col_name)).first()[0]
    max_val = df.agg(F.max(col_name)).first()[0]
    quantiles_vals = df.approxQuantile(col_name,
                                       quantiles_list,
                                       0)
  
    # Store min, quantiles and max in output dict, sequentially
    quantiles_dict = {0.0:min_val}
    quantiles_dict.update(dict(zip(quantiles_list, quantiles_vals)))
    quantiles_dict.update({1.0:max_val})
    return(quantiles_dict)

In [0]:
def plot_bucketwise_statistics (summary, bucketizer):
    """
    Takes in a dataframe and a bucketizer object and plots the summary statistics for each bucket in the dataframe. 
  
    Inputs
    Argument 1: Pandas dataframe obtained from bucket_col_print_summary function 
    Argument 2: Bucketizer object obtained from bucket_col_print_summary function
  
    Output
    Displays a plot of bucketwise average ratings nunber of ratings of a parameter.   
    """
    # Creating bucket labels from splits
    classlist = bucketizer.getSplits()
    number_of_classes = len(classlist) - 1

    class_labels = []
    hover_labels = []
    for i in range (number_of_classes):
        hover_labels.append(str(classlist[i])+"-"+str(classlist[i+1]) +" (Bucket name: "+ str(int(i)) +")"  )
        class_labels.append(str(classlist[i])+"-"+str(classlist[i+1]) )
  
    summary["Scaled_number"] = (summary["n_ratings"]-summary["n_ratings"].min())/(summary["n_ratings"].max()-summary["n_ratings"].min()) + 1.5
    summary['Bucket_Names'] = class_labels
  
    # making plot
    x = summary["Bucket_Names"]
    y1 = summary["avg_rating"]
    y2 = summary["n_ratings"]
    err = summary["stddev_rating"]  

    # Plot scatter here
    plt.rcParams["figure.figsize"] = [summary.shape[0]+2, 6.0]
    plt.rcParams["figure.autolayout"] = True
    fig, ax1 = plt.subplots()

    bar = ax1.bar(x, y1, color = "#0000FF")
    ax1.errorbar(x, y1, yerr=err, fmt="o", color="#FF0000")
    ax1.set(ylim=(0, 7))
  
    #ax1.bar_label(bar , fmt='%.2f', label_type='edge')  
    def barlabel(x_list,y_list):
        for i in range(len(x_list)):
            ax1.text(i,y_list[i] + 0.2,y_list[i], ha = 'center',
  			         fontdict=dict(size=10),
  			         bbox=dict(facecolor='#00008B', alpha=0.2)         
  			        )
    barlabel(summary["Bucket_Names"].tolist() ,summary["avg_rating"].round(2).tolist())
  
    ax2 = ax1.twinx()
    ax2.scatter(x, y2, s=summary["Scaled_number"]*500, c = '#FFA500')  
    ax2.set(ylim=(0, summary["n_ratings"].max()*1.15))
    def scatterlabel(x_list,y_list):
  	    for i in range(len(x_list)):
  		    ax2.text(i,y_list[i] + 15000,y_list[i], ha = 'center',
  					 fontdict=dict(size=10),
                     bbox=dict(facecolor='#418BFA', alpha=0.5)
  					)
    scatterlabel(summary["Bucket_Names"].tolist() ,summary["n_ratings"].tolist())
  
    # giving labels to the axises
    ax1.set_xlabel(bucketizer.getOutputCol(), fontdict=dict(size=14)) 
    ax1.set_ylabel("Average Ratings",fontdict=dict(size=14))
  
    # secondary y-axis label
    ax2.set_ylabel('Number of Ratings',fontdict=dict(size=14))
  
    #plot Title
    plt.title('Bucketwise average ratings and number of ratings for \n'+bucketizer.getInputCol(), 
              fontdict=dict(size=14))  

In [0]:
def bucket_col_print_summary(df, splits, inputCol, outputCol):
    """
    Given a numerical column in a data frame, adds a bucketized version of the column to the data frame, according to splits provided.
    Also prints a summary of ratings seen in each bucket made.

    Inputs 
    Argument 1: Data Frame 
    Argument 2: Values at which the column will be split
    Argument 3: Name of the input column (numerical column)
    Argument 4: Name of the output column (bucketized numerical column)

    Output: 
    1) New dataframe with the output column added
    2) Bucketizer object trained from the input column 
    3) Pandas dataframe with summary statistics for ratings seen in buckets of the output column
    Also plots summary statistics for ratings seen in buckets of the output column
    """

    # Dropping bucket if it already exists
    if outputCol in df.columns:
        df = df.drop(outputCol)

    # Training bucketizer
    bucketizer = Bucketizer(splits = splits,
                            inputCol  = inputCol,
                            outputCol = outputCol)
    
    df = bucketizer.setHandleInvalid("keep").transform(df)

    # Printing meta information on buckets created
    print("Added bucketized column {}".format(outputCol))
    print("")
    print("Bucketing done for split definition: {}".format(splits))
    print("")  
    print("Printing summary statistics for ratings in buckets below:")

    # Creating a summary statistics dataframe and passing it to the plotting function
    summary =  (df
                .groupBy(outputCol)
                .agg(F.avg('rating').alias('avg_rating'),
                     F.stddev('rating').alias('stddev_rating'),
                     F.count('rating').alias('n_ratings'))
                .sort(outputCol)
                .toPandas())
  
    plot_bucketwise_statistics(summary,bucketizer)
  
    return df, bucketizer, summary

In [0]:
def get_column_distribution_summary(df, col_name):
    """
    Takes a column in a data frame and prints the summary statistics (average, standard deviation, count and distinct count) for all unique values in that column.
  
    Inputs 
    Argument 1: Dataframe 
    Argument 2: Name of the column
  
    Output
    Returns nothing 
    Prints a Dataframe with summary statistics
    """
    print(df
          .groupBy(col_name)
          .agg(F.avg('rating').alias('avg_rating'),
               F.stddev('rating').alias('stddev_rating'),
               F.count('rating').alias('n_ratings'),
               F.countDistinct('id').alias('n_recipes'))
          .sort(F.col(col_name).asc())
          .show(50))

In [0]:
def get_n_items_satisfying_condition (df, condition, aggregation_level = "recipe"):
    """
    Given a condition, find the number of recipes / reviews that match the condition.
    Also calculates the percentage of such recipes / reviews as a percentage of all recipes / reviews.
  
    Inputs 
    Argument 1: Dataframe 
    Argument 2: Logical expression describing a condition, string type. eg: "minutes == 0"
    Argument 3: Aggregation level for determining "items", either  "recipe" or "review". Default value == "recipe"
  
    Output: Returns no object.
    Prints the following:
    1) Number of recipes / reviews that satisfy the condition
    2) Total number of recipes / reviews in the dataframe
    3) Percentage of recipes / reviews that satisfy the condition
    """
    # Find out num rows satisfying the condition
    if aggregation_level == "recipe": 
        number_of_rows_satisfying_condition = (df
                                             .filter(condition)
                                             .agg(F.countDistinct("id"))).first()[0]
      
        n_rows_total = (df.agg(F.countDistinct("id"))).first()[0]
    if aggregation_level == "review":
        number_of_rows_satisfying_condition = (df
                                             .filter(condition)
                                             .agg(F.countDistinct("id","user_id"))).first()[0]
        n_rows_total = (df.agg(F.countDistinct("id","user_id"))).first()[0]
  
    # Find out % rows satisfying the conditon and print a properly formatted output
    perc_rows = round(number_of_rows_satisfying_condition * 100/ n_rows_total, 2)
    print('Condition String                   : "{}"'.format(condition))
    print("Number of {}s Satisfying Condition   : {} [{}%]".format(aggregation_level.title(), number_of_rows_satisfying_condition, perc_rows))
    print("Total Number of {}s                  : {}".format(aggregation_level.title(), n_rows_total))

In [0]:
def add_OHE_columns (df, n_name_list):
    """
    Given a list of tags, creates one hot encoded columns for each tag. 
  
    Input
    Argument 1: Dataframe in which the function will add the new columns
    Argument 2: list of tags
  
    Output
    Prints the names of columns that have been added 
    Returns the modified dataframe 
    """
    for name in n_name_list:
        df = (df.withColumn("has_tag_"+name, F.when(F.array_contains(df.tags, name), 1).otherwise(0)))
        print ("added column: has_tag_"+name)

    return df

In [0]:
# Read the data (2)

# Read interaction_level_df_processed
interaction_level_df = spark.read.parquet("dbfs:/FileStore/interaction_level_df")


In [0]:
# Adding user level average features

In [0]:
partition = Window.partitionBy("user_id")

interaction_level_df = (interaction_level_df
                        .withColumn("user_avg_rating",
                                    F.avg(F.col("rating")).over(partition))
                        .withColumn("user_n_ratings",
                                    F.count(F.col("rating")).over(partition))
                        .withColumn("user_avg_years_betwn_review_and_submission",
                                    F.avg(F.col("years_since_submission_on_review_date")).over(partition))
                        .withColumn("user_avg_prep_time_recipes_reviewed",
                                    F.avg(F.col("minutes")).over(partition))
                        .withColumn("user_avg_n_steps_recipes_reviewed",
                                    F.avg(F.col("n_steps")).over(partition))
                        .withColumn("user_avg_n_ingredients_recipes_reviewed",
                                    F.avg(F.col("n_ingredients")).over(partition)))

In [0]:
nutrition_cols = ['calories',
                  'total_fat_per_100_cal',
                  'sugar_per_100_cal',
                  'sodium_per_100_cal',
                  'protein_per_100_cal',
                  'saturated_fat_per_100_cal',
                  'carbohydrates_per_100_cal']

for nutri_col in nutrition_cols:
    interaction_level_df = (interaction_level_df
                            .withColumn("user_avg_{}_recipes_reviewed".format(nutri_col),
                                        F.avg(F.col(nutri_col)).over(partition)))

In [0]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code. 

assert(round(interaction_level_df.filter('user_id == 601529').select('user_avg_rating').first()[0], 2) == 4.22)
assert(interaction_level_df.filter('user_id == 601529').select('user_n_ratings').first()[0] == 27)
assert(round(interaction_level_df.filter('user_id == 601529').select('user_avg_years_betwn_review_and_submission').first()[0], 2) == 3.51)
assert(interaction_level_df.filter('user_id == 233044').select('user_avg_prep_time_recipes_reviewed').first()[0] == 50.3)
assert(interaction_level_df.filter('user_id == 233044').select('user_avg_n_steps_recipes_reviewed').first()[0] == 8.8)
assert(interaction_level_df.filter('user_id == 233044').select('user_avg_n_ingredients_recipes_reviewed').first()[0] == 8.2)
assert(round(interaction_level_df.filter('user_id == 233044').select('user_avg_total_fat_per_100_cal_recipes_reviewed').first()[0]) == 6)

In [0]:
# More Features:
# high_ratings = 5 rating

# user_avg_years_betwn_review_and_submission_high_ratings

# user_avg_prep_time_recipes_reviewed_high_ratings

# user_avg_n_steps_recipes_reviewed_high_ratings

# user_avg_n_ingredients_recipes_reviewed_high_ratings

In [0]:
interaction_level_df = (interaction_level_df
                        .withColumn("ind_5_rating",
                                    F.when(interaction_level_df["rating"] != 5, None)
                                     .otherwise(1))
                        .withColumn("years_since_submission_on_review_date_5_ratings",
                                    F.when(interaction_level_df["rating"] != 5, None)
                                     .otherwise(F.col("years_since_submission_on_review_date")))
                        .withColumn("minutes_5_ratings",
                                    F.when(interaction_level_df["rating"] != 5, None)
                                     .otherwise(F.col("minutes")))
                        .withColumn("n_steps_5_ratings",
                                    F.when(interaction_level_df["rating"] != 5, None)
                                     .otherwise(F.col("n_steps")))
                        .withColumn("n_ingredients_5_ratings",
                                    F.when(interaction_level_df["rating"] != 5, None)
                                     .otherwise(F.col("n_ingredients"))))

In [0]:
partition = Window.partitionBy("user_id")

interaction_level_df = (interaction_level_df
                        .withColumn("user_n_5_ratings",
                                    F.sum(F.col("ind_5_rating")).over(partition))
                        .withColumn("user_avg_years_betwn_review_and_submission_5_ratings",
                                    F.avg(F.col("years_since_submission_on_review_date_5_ratings")).over(partition))
                        .withColumn("user_avg_prep_time_recipes_reviewed_5_ratings",
                                    F.avg(F.col("minutes_5_ratings")).over(partition))
                        .withColumn("user_avg_n_steps_recipes_reviewed_5_ratings",
                                    F.avg(F.col("n_steps_5_ratings")).over(partition))
                        .withColumn("user_avg_n_ingredients_recipes_reviewed_5_ratings",
                                    F.avg(F.col("n_ingredients_5_ratings")).over(partition)))

In [0]:
for nutri_col in nutrition_cols:
    interaction_level_df = (interaction_level_df
                            .withColumn("{}_5_ratings".format(nutri_col),
                                        F.when(interaction_level_df["rating"] != 5, None)
                                         .otherwise(F.col(nutri_col))))
    interaction_level_df = (interaction_level_df
                            .withColumn("user_avg_{}_recipes_reviewed_5_ratings".format(nutri_col),
                                        F.avg(F.col("{}_5_ratings".format(nutri_col))).over(partition)))

In [0]:
# Check - All rows with ratings should have non-null values in corresponding user_avg_5_ratings columns

assert(interaction_level_df
       .filter("rating == 5")
       .filter(interaction_level_df.user_n_5_ratings.isNull() |
               interaction_level_df.user_avg_years_betwn_review_and_submission_5_ratings.isNull() |
               interaction_level_df.user_avg_prep_time_recipes_reviewed_5_ratings.isNull() |
               interaction_level_df.user_avg_n_steps_recipes_reviewed_5_ratings.isNull() |
               interaction_level_df.user_avg_n_ingredients_recipes_reviewed_5_ratings.isNull())
       .count() == 0)

In [0]:
# Check values for a given user id

assert(interaction_level_df.filter('user_id == 233044').select('user_n_5_ratings').first()[0] == 7)
assert(round(interaction_level_df.filter('user_id == 233044').select('user_avg_years_betwn_review_and_submission_5_ratings').first()[0], 2) == 2.24)
assert(round(interaction_level_df.filter('user_id == 233044').select('user_avg_prep_time_recipes_reviewed_5_ratings').first()[0]) == 46)
assert(round(interaction_level_df.filter('user_id == 233044').select('user_avg_n_steps_recipes_reviewed_5_ratings').first()[0], 2) == 7.29)
assert(round(interaction_level_df.filter('user_id == 233044').select('user_avg_n_ingredients_recipes_reviewed_5_ratings').first()[0], 2) == 6.86)

In [0]:
interaction_level_df.printSchema()


### Task 8 Real

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.fpm import FPGrowth
from datetime import datetime

In [0]:
columns_to_select = [
    "user_n_5_ratings",  
    "user_avg_prep_time_recipes_reviewed_5_ratings", 
    "user_avg_n_steps_recipes_reviewed_5_ratings", 
    "user_avg_n_ingredients_recipes_reviewed_5_ratings"
]

# Select and display the specified columns
interaction_level_df.select(*columns_to_select).show()


In [0]:
user_rating_metrics = interaction_level_df.groupBy("user_id").agg(
        round(avg("user_avg_rating"),2).alias("Average Rating"),
        count("user_n_ratings").alias("Total Reviews"),
        count("ind_5_rating").alias("Total of 5-star Reviews")
    ).orderBy(desc("Total Reviews"))

In [0]:
user_rating_metrics = user_rating_metrics.filter(
    col("user_id") != 424680
)

user_rating_metrics.show(10)

In [0]:
filtered_df = interaction_level_df.filter(interaction_level_df["user_avg_prep_time_recipes_reviewed"] != 0)

In [0]:
from pyspark.sql.functions import avg, round

user_recipes_metrics = filtered_df.groupBy("user_id").agg(
        round(avg("user_avg_prep_time_recipes_reviewed_5_ratings"), 2).alias("Avg Prepared time of 5-stars"),
        round(avg("user_avg_n_steps_recipes_reviewed_5_ratings"), 2).alias("Avg No. Steps of 5-stars"),
        round(avg("user_avg_n_ingredients_recipes_reviewed_5_ratings"), 2).alias("Avg No. Ingredients of 5-stars"),
        count("ind_5_rating").alias("Total of 5-star Reviews")
    ).orderBy(desc("Total of 5-star Reviews"))
    
user_recipes_metrics = user_recipes_metrics.filter(
    col("Avg Prepared time of 5-stars") <= 300
)

# Show the result to verify
user_recipes_metrics.show(10)


In [0]:
user_recipes_metrics = user_recipes_metrics.dropna(subset=[
    "`Avg Prepared time of 5-stars`", 
    "`Avg No. Steps of 5-stars`", 
    "`Avg No. Ingredients of 5-stars`"
])

# Show the result to verify
user_recipes_metrics.show(10)



In [0]:
user_nutrition_metrics = filtered_df.groupBy("user_id").agg(
        round(avg("user_avg_calories_recipes_reviewed_5_ratings"), 2).alias("Avg Calories of 5-stars"),
        round(avg("user_avg_total_fat_per_100_cal_recipes_reviewed_5_ratings"), 2).alias("Avg Total Fat per 100 of 5-stars"),
        round(avg("user_avg_sugar_per_100_cal_recipes_reviewed_5_ratings"), 2).alias("Avg Sugar per 100 of 5-stars"),
        round(avg("user_avg_sodium_per_100_cal_recipes_reviewed_5_ratings"), 2).alias("Avg Sodium per 100 of 5-stars"),
        round(avg("user_avg_protein_per_100_cal_recipes_reviewed_5_ratings"), 2).alias("Avg Prorein per 100 of 5-stars"),
        round(avg("user_avg_saturated_fat_per_100_cal_recipes_reviewed_5_ratings"), 2).alias("Avg Saturated Fat per 100 of 5-stars"),
        round(avg("user_avg_carbohydrates_per_100_cal_recipes_reviewed_5_ratings"), 2).alias("Avg Carbonhydrates per 100 of 5-stars"),
         count("ind_5_rating").alias("Total of 5-star Reviews")
    ).orderBy(desc("Total of 5-star Reviews"))

user_nutrition_metrics = user_nutrition_metrics.dropna(subset=[
    "Avg Calories of 5-stars", 
    "Avg Total Fat per 100 of 5-stars", 
    "Avg Sugar per 100 of 5-stars",
    "Avg Sodium per 100 of 5-stars",
    "Avg Prorein per 100 of 5-stars",
    "Avg Saturated Fat per 100 of 5-stars",
    "Avg Carbonhydrates per 100 of 5-stars",
])

user_nutrition_metrics = user_nutrition_metrics.filter(
    col("user_id") != 424680
)

# Show the result to verify
user_nutrition_metrics.show(10)

### Task 9: Create Tag-Level Features (Optional)
- Analyze the tags column to identify valuable tags. Consider:
> - Top 5 percentile of frequent tags.
> - Top 5 percentile of highest-rated tags.
> - Bottom 5 percentile of highest-rated tags.
- Create features to capture information from these valuable tags.
- Explore different encoding techniques (e.g., one-hot encoding, or more advanced methods) to represent tag information effectively.

# Tags level EDA

In [0]:
interaction_tag_level_df = interaction_level_df.withColumn('individual_tag',F.explode('tags'))


In [0]:
tags_ratings_summary = (interaction_tag_level_df
                        .groupBy('individual_tag').agg(F.avg('rating').alias('avg_user_rating'),
#                                                      F.max('rating').alias('max_user_rating'),
#                                                      F.min('rating').alias('min_user_rating'),
                                                       F.count('rating').alias('n_user_ratings'),
                                                       F.countDistinct('id').alias('n_recipes')))

In [0]:
interactions, recipes  =  interaction_level_df.count(), interaction_level_df.agg(F.countDistinct('id')).first()[0]

tags_ratings_summary = (tags_ratings_summary.withColumn("in_percent_recipies", F.col ("n_recipes")/F.lit(recipes))
                                            .withColumn("in_percent_interactions", F.col ("n_user_ratings")/F.lit(interactions)))

###  b1. Top n most rated tags

In [0]:
tags_ratings_summary.sort(F.col("n_user_ratings").desc()).show(10)


In [0]:
tags_ratings_summary = tags_ratings_summary.filter(tags_ratings_summary.in_percent_interactions < 0.75)


In [0]:
top_most_frequent_tags = tags_ratings_summary.sort(F.col("n_user_ratings").desc())


In [0]:
get_quantiles(df = top_most_frequent_tags , 
              col_name = 'in_percent_interactions', 
              quantiles_list = [0.01,0.25,0.5, 0.75,0.8,0.85,0.9,0.95, 0.99])

In [0]:
# keep tags appearing in the top 5 percentile 
top_most_frequent_tags = top_most_frequent_tags.filter("in_percent_interactions > 0.16")

top_most_frequent_tags.count()

In [0]:
top_frequent_tags_list = [data[0] for data in top_most_frequent_tags.select('individual_tag').collect()]


In [0]:
interaction_level_df = add_OHE_columns (interaction_level_df, top_frequent_tags_list)


### 2. Bottom n least rated tags

In [0]:
tags_ratings_summary.sort(F.col("n_user_ratings").asc()).show(10)


Các thẻ trên có mặt trong 1 công thức trong hơn hai trăm nghìn. Các tính năng chúng tôi tạo dựa trên các thẻ này sẽ không cung cấp thông tin mới cho mô hình. Nếu các thẻ này được mã hóa nóng, toàn bộ cột sẽ chứa số 0 và chỉ một số hàng có số 1. Một mã hóa nóng (one hot encoding) của các thẻ này không phải là một ý tưởng hay. Nếu bạn nghĩ ra một mã hóa nắm bắt được độ hiếm của các thẻ này thì chỉ khi đó bạn mới có thể thêm các thẻ này vào phân tích.

### 3. Top n rated tags

In [0]:
tags_ratings_summary.sort(F.col("avg_user_rating").desc()).show(5)


In [0]:
get_quantiles (tags_ratings_summary, "n_user_ratings", quantiles_list = [0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.5, 0.75, 0.99])


In [0]:
tags_ratings_summary = tags_ratings_summary.filter(tags_ratings_summary.n_user_ratings > 100)


In [0]:
top_rated_tags_df = tags_ratings_summary.sort(F.col("avg_user_rating").desc())


In [0]:
get_quantiles(df = top_rated_tags_df , 
              col_name = 'avg_user_rating', 
              quantiles_list = [0.01,0.25,0.5, 0.75,0.8,0.85,0.9,0.95, 0.99])

In [0]:
# keep tags above 95 percentile
top_rated_tags_df = top_rated_tags_df.filter("avg_user_rating > 4.53")

top_rated_tags_df.count()

In [0]:
top_rated_tags_list = [data[0] for data in top_rated_tags_df.select('individual_tag').collect()]


In [0]:
set(top_frequent_tags_list) & set(top_rated_tags_list)


In [0]:
all_added_columns_set = set(top_frequent_tags_list).union(set(top_rated_tags_list))


In [0]:
interaction_level_df = add_OHE_columns (interaction_level_df, top_rated_tags_list)


In [0]:
top_rated_tags_df.orderBy("avg_user_rating", ascending=False).show(5)

### 4. Bottom n rated tags

In [0]:
bottom_rated_tags_df = tags_ratings_summary.sort(F.col("avg_user_rating").asc())


In [0]:
get_quantiles (bottom_rated_tags_df, "avg_user_rating", quantiles_list = [0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.15, 0.2, 0.25, 0.5, 0.75, 0.99])


In [0]:
bottom_rated_tags_df = bottom_rated_tags_df.filter("avg_user_rating < 4.00")

bottom_rated_tags_df.count()

In [0]:
bottom_rated_tags_list = [data[0] for data in bottom_rated_tags_df.select('individual_tag').collect()]


In [0]:
all_added_columns_set & set(bottom_rated_tags_list)


In [0]:
interaction_level_df =  add_OHE_columns(interaction_level_df, bottom_rated_tags_list)


In [0]:
bottom_rated_tags_df.orderBy("avg_user_rating", desc = True).show(5)


In [0]:
bottom_rated_tags_list

### **Final DataFrame


In [0]:
len(interaction_level_df.columns)


In [0]:
interaction_level_df.write.mode("overwrite").parquet("interaction_level_df_BDA")
