In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext,Row 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressionModel
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [None]:
from sentence_transformers import SentenceTransformer
import matplotlib.pyplot as plt

In [None]:
sc._conf.get('spark.executor.memory'),sc._conf.get('spark.driver.memory')

Out[3]: ('16g', '16g')

In [None]:
sc._conf.set('spark.executor.memory','16g'),sc._conf.set('spark.driver.memory','16g')

Out[4]: (<pyspark.conf.SparkConf at 0x7fe52990a040>,
 <pyspark.conf.SparkConf at 0x7fe52990a040>)

In [None]:
spark = SparkSession.builder.getOrCreate()
spark

###Predicting Dish cost for recipes in AllRecipes using ingredients, ingredients size, recipe name by training model on yelp menu prices 

Step 1 : Pre-Process the data from all recipes and yelp \
Step 2 : Using SentenceTransformer, get embeddings for dish name and ingredients in yelp and all recipes data \
Step 3 : Match the dishes/recipes from all recipes with yelp using the embeddings \
Step 4 : Attach the yelp menu price for the matched dishes in all recipes \
Step 5 : Normalize the ingredients size in all recipes to single serving size/ double serving size etc (based on dish category) \
Step 6 : Train regression model on matched dishes using sentence embeddings for ingredient, ingredient size and name as features and matched yelp dish price as target to predict price for other dishes in all recipes \
%md

In [None]:
#DataFrame containing data from allrecipes website 
df_ar = spark.read.format("mongo").option('uri',f'mongodb+srv://{cluster_detail}/project_db.recipes').load()
df_ar.count()

Out[59]: 21885

In [None]:
# #get just the ingredient names from ingredients list containing size information, size label and ingredient name
# ingredients_check_udf = udf(ingredients_check, ArrayType(IntegerType()))

# df.select('name','ingredients').withColumn("ingredients_str", ingredients_check_udf("ingredients")).withColumn('check', F.aggregate("ingredients_str", F.lit(0), lambda acc, x: acc + x)).filter('check>0').select('ingredients').rdd.collect()


Out[49]: [Row(ingredients=[['2', 'cups', 'Greek yogurt'], ['2', 'hot house cucumbers - peeled, seeded, and thinly sliced'], ['3', 'tablespoons', 'lemon juice'], ['2', 'tablespoons', 'chopped fresh mint'], ['½', 'teaspoon', 'white sugar'], ['¼', 'teaspoon', 'kosher salt']]),
 Row(ingredients=[['4', '4-ounce', 'walleye fillets'], ['2', '12-inch squares of aluminum foil'], ['5', 'tablespoons', 'butter, melted'], ['1', 'pinch', 'seasoned salt, or to taste'], ['1', 'pinch', 'garlic and herb seasoning blend (such as Mrs. Dash®), or to taste']]),
 Row(ingredients=[['1 ½', 'cups', 'barbecue sauce'], ['¼', 'cup', 'honey'], ['2', 'teaspoons', 'Creole mustard'], ['¼', 'teaspoon', 'Worcestershire sauce'], ['¼', 'teaspoon', 'hot pepper sauce'], ['9', 'Tyson Fresh Boneless, Skinless Chicken Thighs'], ['½', 'teaspoon', 'salt'], ['¼', 'teaspoon', 'pepper']]),
 Row(ingredients=[['butter, melted, divided '], ['2', 'cups', 'all-purpose flour, plus more as needed'], ['2', 'tablespoons', 'white sugar'], ['

In [None]:
#get just the ingredient names from ingredients list containing size information, size label and ingredient name
def join_ingredients(ingredients):
    return ",".join([i[-1] for i in ingredients])

join_ingredients_udf = udf(join_ingredients, StringType())
df_all_recip=df_ar.select('name','ingredients').withColumn("ingredients_str", join_ingredients_udf("ingredients"))

In [None]:
#get embeddings for recipe_name and ingredients
emb_model = SentenceTransformer('all-MiniLM-L6-v2')
def get_embeddings(str):
    return emb_model.encode(str, show_progress_bar=True).tolist()
get_embeddings_udf = udf(get_embeddings, ArrayType(FloatType()))
df_all_recip =df_all_recip.withColumn("ing_embeddings",get_embeddings_udf("ingredients_str"))\
              .withColumn("rec_embeddings",get_embeddings_udf("name")).cache()

In [None]:
df_all_recip.count()

Out[154]: 21885

In [None]:
#DataFrame containing data from yelp website 
df_yp = spark.read.format("mongo").option('uri','mongodb+srv://{cluster_detail}/project_db.yelp').load()
df_yp.count()

Out[87]: 12035

In [None]:
df_yp.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- cuisine: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- ingredients_transl: string (nullable = true)
 |-- menu_type: string (nullable = true)
 |-- menu_type_transl: string (nullable = true)
 |-- price: string (nullable = true)
 |-- recipe_name: string (nullable = true)
 |-- recipe_name_proc: string (nullable = true)
 |-- recipe_name_transl: string (nullable = true)
 |-- rest_name: string (nullable = true)



In [None]:
#change price to int 
def convert_to_float(value):
    try:
        return float(value.strip('$ '))
    except ValueError:
        return None

udf_convert_to_float = udf(convert_to_float, FloatType())
df_yp=df_yp.withColumn("price_val", udf_convert_to_float("price")).cache()


In [None]:
df_yp.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- cuisine: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- ingredients_transl: string (nullable = true)
 |-- menu_type: string (nullable = true)
 |-- menu_type_transl: string (nullable = true)
 |-- price: string (nullable = true)
 |-- recipe_name: string (nullable = true)
 |-- recipe_name_proc: string (nullable = true)
 |-- recipe_name_transl: string (nullable = true)
 |-- rest_name: string (nullable = true)
 |-- price_val: float (nullable = true)



In [None]:
df_yp=df_yp.filter('price_val is not NULL').select('recipe_name_transl','ingredients_transl','menu_type_transl','price_val').cache()

In [None]:
#map each menu_type to one of mains, appetizers, desserts or beverages  and assign serving size
predef_menu_types=['Mains','Sides','Appetizers','Desserts','Beverages']
serving_size=[1,3,4,2,1]
predef_menu_types_embeddings=emb_model.encode(predef_menu_types, show_progress_bar=True)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

In [None]:
df_yp.filter(isnan(col('recipe_name_transl'))).withColumn("rec_embeddings",when(~isnan(col('recipe_name_transl')),get_yelp_embeddings_udf("recipe_name_transl")).otherwise(None)).show()

# .withColumn('val_transformed', 
#                    when(~isnan(col('rec_embeddings')), my_udf(col('val')))
#                    .otherwise(None))

+------------------+--------------------+-----------------+---------+--------------+
|recipe_name_transl|  ingredients_transl| menu_type_transl|price_val|rec_embeddings|
+------------------+--------------------+-----------------+---------+--------------+
|               NaN|Hedrick's Gin, Ro...|        Cocktails|     13.0|          null|
|               NaN|Bombay Sapphire, ...|        Cocktails|     13.0|          null|
|               NaN|Tequila, Fresh Sq...|        Cocktails|     13.0|          null|
|               NaN|Single Malt Scotc...|        Cocktails|     13.0|          null|
|               NaN|Tequila Blanco, C...|        Cocktails|     13.0|          null|
|               NaN|Tito's Vodka, Fre...|        Cocktails|     13.0|          null|
|               NaN|Gin, Sweet Vermou...|        Cocktails|     13.0|          null|
|               NaN|Maker's Mark, Mud...|        Cocktails|     13.0|          null|
|               NaN|Amaretto, Fresh S...|        Cocktails|     1

In [None]:
#generate sentence embeddings for yelp data. For NaN's replace with NULLs 
#get embeddings for recipe_name and ingredients

def get_yelp_embeddings(string_val):
    return emb_model.encode(string_val, show_progress_bar=True).tolist()

get_yelp_embeddings_udf = udf(get_yelp_embeddings, ArrayType(FloatType()))
df_yelp =df_yp.withColumn("ing_embeddings",when(~isnan(col('ingredients_transl')),get_yelp_embeddings_udf("ingredients_transl")).otherwise(None))\
               .withColumn("rec_embeddings",when(~isnan(col('recipe_name_transl')),get_yelp_embeddings_udf("recipe_name_transl")).otherwise(None))\
.cache()

In [None]:
#get serving size based on closest predefined menu type (mains and beverages : 1, appetizers and desserts : 2)
def get_serving_size(menu_type):
    menu_emb=emb_model.encode(menu_type, show_progress_bar=True).tolist()
#     return len(predef_menu_types_embeddings)
    val=0
    for i,embeding in enumerate(predef_menu_types_embeddings):
        emb_sim=0
        for j in range(len(embeding)):
            emb_sim+=embeding[j]*menu_emb[j]
        if emb_sim>val:
            val=emb_sim
            size=serving_size[i]
    return size

get_serving_size_udf = udf(get_serving_size, IntegerType())

In [None]:
df_yelp=df_yelp.withColumn("size",get_serving_size_udf('menu_type_transl')).cache()

In [None]:
df_all_recip.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                name|         ingredients|     ingredients_str|      ing_embeddings|      rec_embeddings|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Irish Black Russi...|[[1, (1.5 fluid o...|coffee flavored l...|[-0.05815727, -0....|[-0.07928579, -0....|
|Sweet Butternut S...|[[1, , unbaked 9-...|unbaked 9-inch pi...|[-0.0059607686, 0...|[0.037480827, 0.0...|
|Cheesy Tortilla S...|[[3, tablespoons,...|butter,chopped on...|[-0.035500154, -0...|[-0.086157195, 0....|
|Bahama Mama I Recipe|[[¼, fluid ounce,...|coffee-flavored l...|[0.030623345, -0....|[-0.009275567, 0....|
|The Cheesecake Fa...|[[4, , skinless, ...|skinless, boneles...|[-0.042857114, -0...|[-0.016865496, -0...|
|Cold Brewed Coffe...|[[1 ¾, cups, grou...|ground coffee,wat...|[-0.037976474, -0...|[-0.08331829, 0.0...|
|Whole30® Cinnamon...|[[2, tablespoon

In [None]:
sim_threshold=0.9
def sim_score_all_recipes_yelp(ar_rec,ar_ing,yp_rec,yp_ing):
    rec_val=None
    ing_val=None
    if yp_rec is not None and ar_rec is not None:
        rec_val=0
        for i in range(len(yp_rec)):
            rec_val+=yp_rec[i]*ar_rec[i]
    if yp_ing is not None and ar_ing is not None:
        ing_val=0
        for i in range(len(yp_ing)):
            ing_val+=yp_ing[i]*ar_ing[i]
    if rec_val is not None and ing_val is not None:
        val=(rec_val+ing_val)/2
    elif rec_val is not None:
        val=rec_val
    elif ing_val is not None:
        val=ing_val
    else:
        val=0
    return val


def join_all_recipes_yelp(ar_rec,ar_ing,yp_rec,yp_ing):
    return sim_score_all_recipes_yelp(ar_rec,ar_ing,yp_rec,yp_ing)>sim_threshold

join_all_recipes_yelp_udf=udf(join_all_recipes_yelp,BooleanType())


sim_score_all_recipes_yelp_udf=udf(sim_score_all_recipes_yelp,FloatType())


In [None]:
def recipe_matching(all_recip_input):
    df_input=df_all_recip.filter(df_all_recip.name==all_recip_input)
    return df_input.join(df_yelp,joinspec,"inner").select(df_input.name.alias('input_recipe'),df_yelp.recipe_name_transl.alias('matched_recipe'),df_yelp.price_val).cache()
    

In [None]:
df=recipe_matching('Wild Mushroom Sauce Recipe')
df.show()

+--------------------+-------------------+---------+
|        input_recipe|     matched_recipe|price_val|
+--------------------+-------------------+---------+
|Wild Mushroom Sau...|Wild Mushroom Sauce|      4.0|
+--------------------+-------------------+---------+



In [None]:
joinspec=join_all_recipes_yelp_udf(df_all_recip.rec_embeddings,df_all_recip.ing_embeddings,df_yelp.rec_embeddings,df_yelp.ing_embeddings)
windowSpec = Window.partitionBy(df_all_recip.name,df_all_recip.ingredients)
df_combined=df_all_recip.join(df_yelp,joinspec,"inner").cache()
#withColumn('sim_score',sim_score_all_recipes_yelp_udf(df_all_recip.rec_embeddings,df_all_recip.ing_embeddings,df_yelp.rec_embeddings,df_yelp.ing_embeddin#gs)).withColumn('max_sim_score',max('sim_score').over(windowSpec)).filter('sim_score-max_sim_score between -0.001 and 0.001')

In [None]:
#matched recipes (name is from all recipes and recipe_name_transl is from yelp)
df_combined.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+---------+--------------+--------------------+----+
|                name|         ingredients|     ingredients_str|      ing_embeddings|      rec_embeddings|  recipe_name_transl|ingredients_transl|    menu_type_transl|price_val|ing_embeddings|      rec_embeddings|size|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+---------+--------------+--------------------+----+
|Wild Mushroom Sau...|[[4, tablespoons,...|butter,finely cho...|[0.03173336, -0.0...|[0.037331596, 0.0...| Wild Mushroom Sauce|               NaN|Sauces and Accomp...|      4.0|          null|[0.013331903, 0.0...|   2|
|Hollandaise Sauce...|[[3, large, egg y...|egg yolks,fresh l...|[-0.04680537, -0....|[-0.050772402, -0...|   Hollandaise Sau

In [None]:
df_combined.count()

In [None]:
# feature_cols = ["ing_embeddings", "rec_embeddings"]
# target_col = "price_val"

In [None]:
# assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# data = assembler.transform(df_combined)
# train_data, test_data = df_combined.randomSplit([0.7, 0.3])

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-4339210645485434>:2[0m
[1;32m      1[0m assembler [38;5;241m=[39m VectorAssembler(inputCols[38;5;241m=[39mfeature_cols, outputCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mfeatures[39m[38;5;124m"[39m)
[0;32m----> 2[0m data [38;5;241m=[39m assembler[38;5;241m.[39mtransform(df_combined)
[1;32m      3[0m train_data, test_data [38;5;241m=[39m df_combined[38;5;241m.[39mrandomSplit([[38;5;241m0.7[39m, [38;5;241m0.3[39m])

File [0;32m/databricks/spark/python/pyspark/ml/base.py:262[0m, in [0;36mTransformer.transform[0;34m(self, dataset, params)[0m
[1;32m    260[0m         [38;5;28;01mreturn[39;00m [38;5;28mself[39m[38;5;241m.[39mcopy(params)[38;5;241m.[39m_transform(dataset)
[1;32m    261[0m     [38;5;28;01melse[39;00m:
[0;32m--> 262[0m         [38;5;28

In [None]:

# rf = RandomForestRegressor(labelCol="label", featuresCol="features")


In [None]:
spark.stop()