In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

# Data preparation

In [0]:
recipes = spark.read.option("sep", ",").option("quote", "\"").option("escape", "\"").option("header", "true").option("multiline", "true").csv("recipes.csv")
reviews = spark.read.option("sep", ",").option("quote", "\"").option("escape", "\"").option("header", "true").option("multiline", "true").csv("reviews.csv")

In [0]:
recipes.createOrReplaceTempView("recipes")
reviews.createOrReplaceTempView("reviews")

In [1]:
# %sql
# select *
# from recipes
# limit 100

In [0]:
recipes_clean = spark.sql("""
select RecipeId, Description, RecipeInstructions, ReviewCount, CookTime, PrepTime, RecipeCategory, RecipeIngredientParts, AggregatedRating, Calories, FatContent, SaturatedFatContent, CholesterolContent, SodiumContent, CarbohydrateContent, FiberContent, SugarContent, ProteinContent, RecipeServings 
from recipes
where AggregatedRating != 'NA' and ReviewCount > 1
""")

In [0]:
# recipes_clean = spark.sql("""
# select RecipeId, Description, RecipeInstructions, ReviewCount, CookTime, PrepTime, RecipeCategory, RecipeIngredientParts, AggregatedRating, Calories, FatContent, SaturatedFatContent, CholesterolContent, SodiumContent, CarbohydrateContent, FiberContent, SugarContent, ProteinContent, RecipeServings 
# from recipes
# where AggregatedRating != 'NA'
# """)

In [0]:
recipes_clean.createOrReplaceTempView("recipes_clean")

# Feature processing

In [0]:
### Feature: CookDuration, PrepDuration
recipes_clean = recipes_clean.withColumn('CookHours', F.regexp_extract(F.col('CookTime'), '(\d+)(?=H)', 0).cast('int'))
recipes_clean = recipes_clean.withColumn('CookMinutes', F.regexp_extract(F.col('CookTime'), '(\d+)(?=M)', 0).cast('int'))
recipes_clean = recipes_clean.fillna({'CookHours': 0, 'CookMinutes': 0})
recipes_clean = recipes_clean.withColumn('CookDuration', recipes_clean['CookHours'] * 60 + recipes_clean['CookMinutes'])

recipes_clean = recipes_clean.withColumn('PrepHours', F.regexp_extract(F.col('PrepTime'), '(\d+)(?=H)', 0).cast('int'))
recipes_clean = recipes_clean.withColumn('PrepMinutes', F.regexp_extract(F.col('PrepTime'), '(\d+)(?=M)', 0).cast('int'))
recipes_clean = recipes_clean.fillna({'PrepHours': 0, 'PrepMinutes': 0})
recipes_clean = recipes_clean.withColumn('PrepDuration', recipes_clean['PrepHours'] * 60 + recipes_clean['PrepMinutes'])

recipes_clean = recipes_clean.drop(*['CookTime', 'CookHours', 'CookMinutes', 'PrepTime', 'PrepHours', 'PrepMinutes'])

### Feature: DescriptionLen
recipes_clean = recipes_clean.withColumn('DescriptionLen', F.length(recipes_clean['Description']))
recipes_clean = recipes_clean.drop('Description')

### Feature: RecipeIngredientPartsCount
recipes_clean = recipes_clean.withColumn('RecipeIngredientPartsCount', F.size(F.split(F.col('RecipeIngredientParts'), ',')))
recipes_clean = recipes_clean.drop('RecipeIngredientParts')

### Feature: RecipeInstructionSteps
recipes_clean = recipes_clean.withColumn('RecipeInstructionSteps', F.size(F.split(F.col('RecipeInstructions'), ',')))
recipes_clean = recipes_clean.drop('RecipeInstructions')

In [0]:
### Aggregate RecipeCategory
category_counts = recipes_clean.groupBy('RecipeCategory').agg(F.countDistinct('RecipeId').alias('CategoryCount'))
recipes_clean = recipes_clean.join(category_counts, on = 'RecipeCategory', how = 'left')
recipes_clean = recipes_clean.withColumn('RecipeCategory', F.when(F.col('CategoryCount') < 500, 'Other').otherwise(F.col('RecipeCategory')))
recipes_clean = recipes_clean.drop('CategoryCount')
recipes_clean.groupBy('RecipeCategory').agg(F.countDistinct('RecipeId').alias('count')).orderBy(F.desc('count')).show(50,truncate=False)

### Feature: dummy variables based on RecipeCategory
indexer = StringIndexer(inputCol='RecipeCategory', outputCol='RecipeCategoryIndex')
recipes_clean = indexer.fit(recipes_clean).transform(recipes_clean)
encoder = OneHotEncoder(inputCol='RecipeCategoryIndex', outputCol='RecipeCategoryVec')
recipes_clean = encoder.fit(recipes_clean).transform(recipes_clean)

recipe_category_index_mapping = recipes_clean.select(['RecipeCategory', 'RecipeCategoryIndex']).distinct()
# display(recipe_category_index_mapping)

+----------------+-----+
|RecipeCategory  |count|
+----------------+-----+
|Other           |17860|
|Dessert         |17278|
|Lunch/Snacks    |11701|
|One Dish Meal   |11147|
|Vegetable       |9546 |
|Breakfast       |7122 |
|Beverages       |5567 |
|Chicken         |5076 |
|Pork            |4559 |
|Breads          |4380 |
|Potato          |4297 |
|Chicken Breast  |4275 |
|Quick Breads    |4217 |
|Meat            |4096 |
|Sauces          |3875 |
|Cheese          |3102 |
|Bar Cookie      |2561 |
|Drop Cookies    |2404 |
|Pie             |2375 |
|Yeast Breads    |2263 |
|< 60 Mins       |2050 |
|Stew            |1964 |
|< 30 Mins       |1940 |
|Salad Dressings |1684 |
|Candy           |1684 |
|Beans           |1669 |
|Low Protein     |1622 |
|< 15 Mins       |1557 |
|Spreads         |1491 |
|Smoothies       |1321 |
|Poultry         |1158 |
|Steak           |1151 |
|Frozen Desserts |1101 |
|Onions          |1096 |
|Savory Pies     |1040 |
|Curries         |1001 |
|Rice            |998  |


In [0]:
### prepare response variable
### try multi class classification
# recipes_clean = recipes_clean.withColumn('AggregatedRatingBucket', F.when(recipes_clean['AggregatedRating'].isin([1, 1.5, 2, 2.5]), 'low').when(recipes_clean['AggregatedRating'].isin([3, 3.5, 4]), 'medium').otherwise('high'))
# recipes_clean = recipes_clean.withColumn('AggregatedRatingBucket', F.when(recipes_clean['AggregatedRatingBucket']=='low', '1').when(recipes_clean['AggregatedRatingBucket'] == 'medium', '2').otherwise('3'))
### change to binary class classification
recipes_clean.groupBy('AggregatedRating').count().orderBy('AggregatedRating').show()
recipes_clean = recipes_clean.withColumn('AggregatedRatingBucket', F.when(recipes_clean['AggregatedRating'].isin([1, 1.5, 2, 2.5, 3, 3.5]), 'low').otherwise('high'))
recipes_clean = recipes_clean.withColumn('AggregatedRatingBucket', F.when(recipes_clean['AggregatedRatingBucket']=='low', '1').otherwise('0'))
recipes_clean = recipes_clean.withColumn('AggregatedRatingBucket', recipes_clean['AggregatedRatingBucket'].cast(IntegerType()))
recipes_clean = recipes_clean.drop('AggregatedRating')

+----------------+------+
|AggregatedRating| count|
+----------------+------+
|               1|   225|
|             1.5|    76|
|               2|   328|
|             2.5|   673|
|               3|  2755|
|             3.5|  3978|
|               4| 16063|
|             4.5| 34330|
|               5|109201|
+----------------+------+



In [0]:
recipes_clean.groupBy('AggregatedRatingBucket').count().orderBy('AggregatedRatingBucket').show()

+----------------------+------+
|AggregatedRatingBucket| count|
+----------------------+------+
|                     0|159594|
|                     1|  8035|
+----------------------+------+



In [0]:
### convert string columns to numerical columns
cols_to_convert = ['DescriptionLen', 'RecipeInstructionSteps', 'Calories', 'FatContent', 'SaturatedFatContent', 'CholesterolContent', 'SodiumContent', 'CarbohydrateContent', 'FiberContent', 'SugarContent', 'ProteinContent', 'RecipeServings']
for col_name in cols_to_convert:
    recipes_clean = recipes_clean.withColumn(col_name, recipes_clean[col_name].cast("float"))

In [0]:
recipes_clean.select(cols_to_convert).describe().show()

+-------+------------------+----------------------+------------------+----------------+-------------------+------------------+------------------+-------------------+-----------------+-----------------+------------------+------------------+
|summary|    DescriptionLen|RecipeInstructionSteps|          Calories|      FatContent|SaturatedFatContent|CholesterolContent|     SodiumContent|CarbohydrateContent|     FiberContent|     SugarContent|    ProteinContent|    RecipeServings|
+-------+------------------+----------------------+------------------+----------------+-------------------+------------------+------------------+-------------------+-----------------+-----------------+------------------+------------------+
|  count|            167629|                167629|            167629|          167629|             167629|            167629|            167629|             167629|           167629|           167629|            167629|            106357|
|   mean|202.45951476176558|    12.37081

### check missing rate

In [0]:
# missing_rates = recipes_clean.select([(F.count(F.when(F.col(c).isNull(), c))/recipes_clean.count()).alias(c) for c in cols_to_convert]).show()

In [0]:
recipes_clean = recipes_clean.fillna(0, subset=cols_to_convert)

In [0]:
recipes_clean = recipes_clean.drop(*['RecipeCategory', 'ReviewCount', 'RecipeId'])

### check association direction between features and response

In [0]:
# feature_cols = ['DescriptionLen', 'RecipeInstructionSteps', 'Calories', 'FatContent', 'SaturatedFatContent', 'CholesterolContent', 'SodiumContent', 
#     'CarbohydrateContent', 'FiberContent', 'SugarContent', 'ProteinContent', 'RecipeServings', 
#     'CookDuration', 'PrepDuration', 'RecipeIngredientPartsCount'] 
# response_col = 'AggregatedRatingBucket'

# for feature_col in feature_cols:
#     correlation = recipes_clean.stat.corr(feature_col, response_col)
#     print(f"Correlation between {feature_col} and {response_col}: {round(correlation, 4)}")

# Model building

### upsampling minority class

In [0]:
# # Calculating the ratio of weights to oversample
# score1_df = recipes_clean.filter(F.col("AggregatedRatingBucket") == 1)
# score2_df = recipes_clean.filter(F.col("AggregatedRatingBucket") == 2)
# score3_df = recipes_clean.filter(F.col("AggregatedRatingBucket") == 3)

# ratio_3_1 = int(score3_df.count()/score1_df.count())
# ratio_3_2 = int(score3_df.count()/score2_df.count())

# print("ratio_3_1: {}".format(ratio_3_1))
# print("ratio_3_2: {}".format(ratio_3_2))

# # duplicate the minority rows in Successful state
# up1_df = score1_df.withColumn("dummy", F.explode(F.array([F.lit(x) for x in range(int(ratio_3_1+1))]))).drop('dummy')
# # combine both oversampled successful rows and previous majority rows 
# score3_df = score3_df.unionAll(up1_df)

# # duplicate the minority rows in Successful state
# up2_df = score2_df.withColumn("dummy", F.explode(F.array([F.lit(x) for x in range(int(ratio_3_2+1))]))).drop('dummy')
# # combine both oversampled successful rows and previous majority rows 
# score3_df = score3_df.unionAll(up2_df)

# recipes_clean_up = score3_df
# # recipes_clean_up.groupBy('AggregatedRatingBucket').count().orderBy('AggregatedRatingBucket').show()

In [0]:
# # Calculating the ratio of weights to oversample
# score0_df = recipes_clean.filter(F.col("AggregatedRatingBucket") == 0)
# score1_df = recipes_clean.filter(F.col("AggregatedRatingBucket") == 1)

# ratio_1_0 = int(score1_df.count()/score0_df.count())

# print("ratio_1_0: {}".format(ratio_1_0))

# # duplicate the minority rows in Successful state
# up0_df = score0_df.withColumn("dummy", F.explode(F.array([F.lit(x) for x in range(int(ratio_1_0+1))]))).drop('dummy')
# # combine both oversampled successful rows and previous majority rows 
# score1_df = score1_df.unionAll(up0_df)

# recipes_clean_up = score1_df
# # recipes_clean_up.groupBy('AggregatedRatingBucket').count().orderBy('AggregatedRatingBucket').show()

### split train and test data

In [0]:
### use upsampling
# train, test = recipes_clean_up.randomSplit([0.7, 0.3], seed = 123)

### not use upsampling
train, test = recipes_clean.randomSplit([0.7, 0.3], seed = 123)

### downsample majority class

In [0]:
# train.groupBy('AggregatedRatingBucket').count().show()

In [0]:
score0_df = train.filter(F.col('AggregatedRatingBucket') == 0)
score1_df = train.filter(F.col('AggregatedRatingBucket') == 1)
ratio = int(score0_df.count()/score1_df.count())
print(ratio)

sampled_score0_df = score0_df.sample(False, 1/ratio)
train_down = sampled_score0_df.unionAll(score1_df)

19


## random forest

In [0]:
assembler = VectorAssembler(
    inputCols=['DescriptionLen', 'RecipeInstructionSteps', 'Calories', 'FatContent', 'SaturatedFatContent', 'CholesterolContent', 'SodiumContent', 
    'CarbohydrateContent', 'FiberContent', 'SugarContent', 'ProteinContent', 'RecipeServings', 
    'CookDuration', 'PrepDuration', 'RecipeIngredientPartsCount', 'RecipeCategoryVec'],
    outputCol='features')

rf = RandomForestClassifier(labelCol="AggregatedRatingBucket", featuresCol="features", numTrees=500)
pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(train)
predictions = model.transform(test)

In [0]:
# AUC
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='AggregatedRatingBucket')
print('Test Area Under ROC', evaluator.evaluate(predictions))

Test Area Under ROC 0.9294897348025476


In [0]:
# Accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="AggregatedRatingBucket", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Test Accuracy = %g" % accuracy)

In [0]:
# Confustion matrix
preds_and_labels = predictions.select(['prediction','AggregatedRatingBucket']).withColumn('AggregatedRatingBucket', F.col('AggregatedRatingBucket').cast(FloatType())).orderBy('prediction')

preds_and_labels = preds_and_labels.select(['prediction','AggregatedRatingBucket'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())

In [0]:
# Precision and recall
# Class-0
print('\n--------------Class-0----------------')
print('Precision          :', round(metrics.precision(0), 3))
print('recall             :', round(metrics.recall(0), 3))

# Class-1
print('\n--------------Class-1----------------')
print('Precision          :', round(metrics.precision(1), 3))
print('recall             :', round(metrics.recall(1), 3))

# # Class-3
# print('\n--------------Class-3----------------')
# print('Precision          :', round(metrics.precision(3), 3))
# print('recall             :', round(metrics.recall(3), 3))

# Overall Accuracy
print('\n Overall Accuracy:', round(metrics.accuracy, 3))

In [0]:
### Plot feature importance for top 20 variables
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Get feature importances from the model
feature_importances = model.stages[-1].featureImportances
print(len(feature_importances))

# Convert indices to feature names
feature_names = ['DescriptionLen', 'RecipeInstructionSteps', 'Calories', 'FatContent', 'SaturatedFatContent', 'CholesterolContent', 'SodiumContent', 
    'CarbohydrateContent', 'FiberContent', 'SugarContent', 'ProteinContent', 'RecipeServings', 
    'CookDuration', 'PrepDuration', 'RecipeIngredientPartsCount', 'RecipeCategoryVec']  # replace with your feature names

num_binary_features = len(feature_importances) - 15
binary_feature_names = ['RecipeCategoryVec_' + str(i) for i in range(num_binary_features)]
feature_names = feature_names + binary_feature_names

importances = {feature_names[i]: importance for i, importance in enumerate(feature_importances)}

# Convert to pandas DataFrame for easier plotting
importances_df = pd.DataFrame(list(importances.items()), columns=['Feature', 'Importance'])
importances_df = importances_df.sort_values(by='Importance', ascending=False)
importances_df = importances_df.iloc[0:20, :]
# Plot feature importances
plt.figure(figsize=(10, 6))
plt.barh(importances_df['Feature'], importances_df['Importance'], color='skyblue')
plt.xlabel("Importance")
plt.title("Feature Importance")
plt.gca().invert_yaxis()
plt.show()

### multinomial logistic regression

In [0]:
assembler = VectorAssembler(
    inputCols=['DescriptionLen', 'RecipeInstructionSteps', 'Calories', 'FatContent', 'SaturatedFatContent', 'CholesterolContent', 'SodiumContent', 
    'CarbohydrateContent', 'FiberContent', 'SugarContent', 'ProteinContent', 'RecipeServings', 
    'CookDuration', 'PrepDuration', 'RecipeIngredientPartsCount', 'RecipeCategoryVec'],
    outputCol='features')

lr = LogisticRegression(featuresCol = 'features', labelCol = 'AggregatedRatingBucket', maxIter=10)

pipeline = Pipeline(stages=[assembler, lr])
model = pipeline.fit(train)
lrModel = model.stages[-1]

predictions = model.transform(test)

In [0]:
# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

In [0]:
# Getting the training summary
trainingSummary = lrModel.summary

In [0]:
# AUC
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='AggregatedRatingBucket')
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [0]:
# Accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="AggregatedRatingBucket", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Test Accuracy = %g" % accuracy)

In [0]:
# Confusion matrix
preds_and_labels = predictions.select(['prediction','AggregatedRatingBucket']).withColumn('AggregatedRatingBucket', F.col('AggregatedRatingBucket').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
preds_and_labels = preds_and_labels.select(['prediction','AggregatedRatingBucket'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())

In [0]:
# Precision and recall
# Class-0
print('\n--------------Class-0----------------')
print('Precision          :', round(metrics.precision(0), 3))
print('recall             :', round(metrics.recall(0), 3))

# Class-1
print('\n--------------Class-1----------------')
print('Precision          :', round(metrics.precision(1), 3))
print('recall             :', round(metrics.recall(1), 3))

# Overall Accuracy
print('\n Overall Accuracy:', round(metrics.accuracy, 3))

In [0]:
# # for multiclass, we can inspect metrics on a per-label basis
# print("False positive rate by label:")
# for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
#     print("label %d: %s" % (i, round(rate, 4)))

# print("True positive rate by label:")
# for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
#     print("label %d: %s" % (i, round(rate, 4)))

# print("Precision by label:")
# for i, prec in enumerate(trainingSummary.precisionByLabel):
#     print("label %d: %s" % (i, round(prec, 4)))

# print("Recall by label:")
# for i, rec in enumerate(trainingSummary.recallByLabel):
#     print("label %d: %s" % (i, round(rec, 4)))

# print("F-measure by label:")
# for i, f in enumerate(trainingSummary.fMeasureByLabel()):
#     print("label %d: %s" % (i, round(f, 4)))

# accuracy = trainingSummary.accuracy
# falsePositiveRate = trainingSummary.weightedFalsePositiveRate
# truePositiveRate = trainingSummary.weightedTruePositiveRate
# fMeasure = trainingSummary.weightedFMeasure()
# precision = trainingSummary.weightedPrecision
# recall = trainingSummary.weightedRecall
# print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
#           % (round(accuracy, 4), round(falsePositiveRate, 4), round(truePositiveRate, 4), round(fMeasure, 4), round(precision, 4), round(recall, 4)))