In [1]:
# Java
import os
os.environ["JAVA_HOME"] = "C:/Program Files/Java/jdk1.8.0_311"
os.environ["SPARK_HOME"] = "C:/spark-3.2.1-bin-hadoop2.7"

# Initiating Spark Session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("word2vec").config("spark.driver.memory", "2g").getOrCreate()

# Initiating spark context
from pyspark import SparkConf
from pyspark import SparkContext
sc = spark.sparkContext

In [2]:
# Data wrangling
import numpy as np
import pandas as pd

# NLP
from operator import add
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.feature import StopWordsRemover, VectorAssembler
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.feature import IDF
from pyspark.ml import Pipeline, PipelineModel

# SQL
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField
from pyspark.sql import Row
from pyspark.sql.functions import split, col
from pyspark.sql import functions as f


In [102]:
# Stop Spark session
sc.stop()

In [3]:
# Define your own schema
schema = StructType([ \
    StructField("id",StringType(),True), \
    StructField("name",StringType(),True), \
    StructField("ingredients", StringType(),True), \
    StructField("steps",StringType(),True), \
    StructField("source",StringType(),True)])    

data = spark.read.format('csv') \
                    .option("schema","schema") \
                    .option("sep",",") \
                    .option("header","true") \
                    .option("quote","\"") \
                    .option("escape", "\"") \
                    .load("input/dfcombined.csv")

In [5]:
# Check data
data.show()

+------+--------------------+--------------------+--------------------+--------+
|    id|                name|         ingredients|               steps|  source|
+------+--------------------+--------------------+--------------------+--------+
|137739|arriba   baked wi...|'winter squash', ...|'make a choice an...|food.com|
| 31490|a bit different  ...|'prepared pizza c...|'preheat oven to ...|food.com|
|112140|all in the kitche...|'ground beef', 'y...|'brown ground bee...|food.com|
| 59389|  alouette  potatoes|'spreadable chees...|'place potatoes i...|food.com|
| 44061|amish  tomato ket...|'tomato juice', '...|'mix all ingredie...|food.com|
|  5289|apple a day  milk...|'milk', 'vanilla ...|'combine ingredie...|food.com|
| 25274|aww  marinated ol...|'fennel seeds', '...|'toast the fennel...|food.com|
| 67888|backyard style  b...|'pork spareribs',...|'in a medium sauc...|food.com|
| 70971|bananas 4 ice cre...|'chocolate sandwi...|'crumble cookies ...|food.com|
| 75452|beat this  banana...

In [4]:
# Check schema
data.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- steps: string (nullable = true)
 |-- source: string (nullable = true)



In [4]:
# Convert ingredients to alphabetically sorted array of strings
data = data.withColumn("ingredients", f.regexp_replace('ingredients', "'|'", '')) \
    .withColumn("ingredients", split(col("ingredients"),", ")) \
    .withColumn("ingredients", f.array_sort('ingredients'))

In [5]:
# Check schema
data.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ingredients: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- steps: string (nullable = true)
 |-- source: string (nullable = true)



In [21]:
# Check shape
print((data.count(), len(data.columns)))

(281846, 5)


In [39]:
# Drop duplicate ingredients
data = data.dropDuplicates(["ingredients"])

In [23]:
# Check shape
print((data.count(), len(data.columns)))

(279153, 5)


In [40]:
# Drop duplicate steps
data = data.dropDuplicates(['steps'])

In [25]:
# Check shape
print((data.count(), len(data.columns)))

(278606, 5)


In [41]:
# Drop NAs
data = data.dropna(how='any')

In [27]:
# Check shape
print((data.count(), len(data.columns)))

(278605, 5)


In [15]:
# Check pre-processed data
data.show()

+------+--------------------+--------------------+--------------------+--------+
|    id|                name|         ingredients|               steps|  source|
+------+--------------------+--------------------+--------------------+--------+
|345571|zucchini  meatbal...|[cayenne pepper, ...|"'meatballs'", 'w...|food.com|
|  1074|bread machine sag...|[active dry yeast...|"add all of the i...|food.com|
|183757|blueberry muffin ...|[active dry yeast...|"add all the ingr...|food.com|
|476699|swedish rye bread...|[bread flour, fas...|"add all the ingr...|food.com|
| 37278|pesto sourdough loaf|[black pepper, br...|"add ingredients ...|food.com|
|277217|rye bread for the...|[bread flour, bre...|"add ingredients ...|food.com|
| 31194|orange hazelnut b...|[bread flour, bre...|"add the ingredie...|food.com|
|430482|whole wheat potat...|[egg, honey, mash...|"add the ingredie...|food.com|
|112214|baked zucchini eg...|[chicken stock, c...|"arrange vegetabl...|food.com|
| 37421|ultimate bread pu...

In [42]:
# Create df of indexed recipe IDs
row_with_index = Row("id","name","ingredients","steps","source","rec_id")
new_schema = StructType(data.schema.fields[:] + [StructField("rec_id", IntegerType(), False)])
zipped_rdd = data.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda r: row_with_index(*list(r[0]) + [r[1]])).toDF(new_schema))

In [45]:
# Save parquet file of indexed recipe IDs 
indexed.write.save("input/indexed.parquet")

In [4]:
# Load parquet file of indexed recipe IDs 
data = spark.read.load("input/indexed.parquet")

In [87]:
# Select variables and check data 
data = data.select("rec_id", "name", "ingredients", "steps").sort(data.rec_id.asc())

In [88]:
# Check data
data.show()

+------+--------------------+--------------------+--------------------+
|rec_id|                name|         ingredients|               steps|
+------+--------------------+--------------------+--------------------+
|     0|zucchini  meatbal...|[cayenne pepper, ...|"'meatballs'", 'w...|
|     1|bread machine sag...|[active dry yeast...|"add all of the i...|
|     2|blueberry muffin ...|[active dry yeast...|"add all the ingr...|
|     3|swedish rye bread...|[bread flour, fas...|"add all the ingr...|
|     4|pesto sourdough loaf|[black pepper, br...|"add ingredients ...|
|     5|rye bread for the...|[bread flour, bre...|"add ingredients ...|
|     6|orange hazelnut b...|[bread flour, bre...|"add the ingredie...|
|     7|whole wheat potat...|[egg, honey, mash...|"add the ingredie...|
|     8|baked zucchini eg...|[chicken stock, c...|"arrange vegetabl...|
|     9|ultimate bread pu...|[bread, egg yolks...|"avoid soft super...|
|    10|sarasota s spicy ...|[bacon, butter, c...|"bacon -- in a

In [84]:
# Check schema
data.printSchema()

root
 |-- rec_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- ingredients: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- steps: string (nullable = true)



In [32]:
# Check shape
print((indexed.count(), len(indexed.columns)))

(278605, 6)


In [None]:
# Create an average word vector for each recipe document
word2vec = Word2Vec(vectorSize = 100, minCount = 1, inputCol = 'ingredients', outputCol = 'word_vec', seed=123)
pipeline = Pipeline(stages=[word2vec])

# Fit the model
pipeline_mdl = pipeline.fit(data)

# Save the pipeline model 
#pipeline_mdl.write().overwrite().save('models/w2vmodel2' + 'pipe_txt')

# Load the pipeline trained Word2Vec model 
pipeline_mdl = PipelineModel.load('models/w2vmodel2' + 'pipe_txt')

# Transform the recipes data
recipes_pipeline_df = pipeline_mdl.transform(data)

In [33]:
# Show the transformed ingredients data
recipes_pipeline_df.select( 'rec_id', 'name', 'ingredients', 'word_vec'). \
    sort(recipes_pipeline_df.rec_id.asc()).show(truncate=True)

+------+--------------------+--------------------+--------------------+
|rec_id|                name|         ingredients|            word_vec|
+------+--------------------+--------------------+--------------------+
|     0|zucchini  meatbal...|[cayenne pepper, ...|[0.06053330030408...|
|     1|bread machine sag...|[active dry yeast...|[0.03495738882985...|
|     2|blueberry muffin ...|[active dry yeast...|[-0.0334744214545...|
|     3|swedish rye bread...|[bread flour, fas...|[0.08946106266230...|
|     4|pesto sourdough loaf|[black pepper, br...|[0.10544267215300...|
|     5|rye bread for the...|[bread flour, bre...|[0.14594541365901...|
|     6|orange hazelnut b...|[bread flour, bre...|[-0.0541136398445...|
|     7|whole wheat potat...|[egg, honey, mash...|[0.21481899668773...|
|     8|baked zucchini eg...|[chicken stock, c...|[-0.0131356265734...|
|     9|ultimate bread pu...|[bread, egg yolks...|[-0.0043353001321...|
|    10|sarasota s spicy ...|[bacon, butter, c...|[0.04410257816

In [46]:
# Gather all the vectors in a list
recipe_vecs = recipes_pipeline_df.select('rec_id', 'word_vec'). \
    sort(recipes_pipeline_df.rec_id.asc()). \
    rdd.map(lambda x: (x[0], x[1])).collect()

In [107]:
# Take a look at the first recipe vector
recipe_vecs[:1]

[(0,
  DenseVector([0.0605, -0.0882, -0.1325, -0.1291, -0.1006, -0.0792, 0.0653, -0.0249, -0.015, -0.2289, -0.0938, -0.0815, -0.0122, 0.0836, 0.1927, -0.0984, -0.0098, -0.156, 0.0833, 0.0914, -0.1048, -0.0912, -0.1469, -0.0431, -0.1604, -0.1987, -0.0776, -0.0752, 0.1594, 0.0261, -0.0103, -0.1287, -0.0439, -0.0019, -0.0814, -0.077, -0.1393, -0.082, -0.0881, -0.0945, -0.0495, -0.2394, 0.0107, 0.0942, -0.1456, -0.2012, 0.099, 0.0364, -0.1075, -0.106, 0.2433, -0.2131, -0.0061, -0.0205, 0.0602, 0.069, 0.0095, -0.0476, -0.1219, 0.0015, 0.0118, -0.0374, -0.1183, -0.087, -0.0356, -0.0472, 0.0635, -0.0162, -0.106, 0.0523, -0.0803, -0.1872, 0.0188, 0.0998, 0.0832, 0.0674, 0.0181, -0.1212, -0.0299, -0.0258, -0.0826, 0.1441, -0.2444, 0.0504, -0.0753, 0.0601, -0.0812, -0.1406, 0.088, 0.2873, -0.0966, -0.0087, -0.1911, -0.0846, -0.075, -0.0028, -0.0366, 0.0266, -0.05, -0.1139]))]

In [11]:
# Function to calculate the cosine similarity of two vectors
def CosineSim(vec1, vec2): 
  return np.dot(vec1, vec2) / np.sqrt(np.dot(vec1, vec1)) / np.sqrt(np.dot(vec2, vec2)) 

In [14]:
# Function to get recipe details
def GetRecipeDetails(input_rec):
  
  a = input_rec.alias("a")
  b = indexed.alias("b")
  
  return a.join(b, col("a.rec_id") == col("b.rec_id"), 'inner') \
        .select([col('a.'+xx) for xx in a.columns] + [col('b.name'),col('b.ingredients'),
                                                      col('b.steps')])

In [52]:
# Create function to recommend top 5 recipes based on ingredient keywords

def KeywordRecommender(key_words, sim_rec_limit=5):
    
  print('\nRecipes containing your ingredients: "' + key_words + '"')
    
  input_words_df = sc.parallelize([(0, key_words)]).toDF(['rec_id', 'ingredients'])
    
  # Pre-process keywords into alphabetically sorted array of strings
  input_words_df = input_words_df.withColumn("ingredients", f.regexp_replace('ingredients', "'|'", '')) \
         .withColumn("ingredients", split(col("ingredients"),", "))
  
  # Transform the keywords into vectors 
  input_words_df = pipeline_mdl.transform(input_words_df)
    
  # Select average word2vec vectors
  input_key_words_vec = input_words_df.select('word_vec').collect()[0][0]
    
  # Get cosine similarity scores
  sim_rec_byword_rdd = sc.parallelize((i[0], float(CosineSim(input_key_words_vec, i[1]))) for i in recipe_vecs)

  sim_rec_byword_df = spark.createDataFrame(sim_rec_byword_rdd) \
         .withColumnRenamed('_1', 'rec_id') \
         .withColumnRenamed('_2', 'score') \
         .orderBy("score", ascending = False)
  
  # Return top 5 similar recipes
  rec_det = GetRecipeDetails(sim_rec_byword_df)
  rec_det.createOrReplaceTempView("tmp")
    
  # Filter out recommended recipes   
  query = '''SELECT * FROM tmp
  WHERE score >= 0.6
  '''
  
  filtered = spark.sql(query)
  df = filtered.orderBy("score", ascending = False).limit(sim_rec_limit)

  return df

In [16]:
# Test keywords similarity to recipe ingredients

keywords = 'veal roast, bacon, carrots, parsley, thyme, onion, butter, pepper'

KeywordRecommender(keywords).show(5)


Recipes containing your ingredients: "veal roast, bacon, carrots, parsley, thyme, onion, butter, pepper"
+------+------------------+--------------------+--------------------+--------------------+
|rec_id|             score|                name|         ingredients|               steps|
+------+------------------+--------------------+--------------------+--------------------+
|194706|0.9126532135298442|mr  grant you too...|[bacon, bay leaf,...|'preheat oven to ...|
| 75266|0.8930075774494053|german bean soup ...|[bacon, bay leave...|'drain the soaked...|
| 53481|0.8907279378984337|boeuf bourguignon...|[bacon, bay leaf,...|'fry 6 strips bac...|
| 86763|0.8880711277807383|onions and noodle...|[beef bouillon, b...|'saute onions in ...|
|227040| 0.884325469812452| delicious lamb stew|[bacon, boneless ...|'cut the meat int...|
+------+------------------+--------------------+--------------------+--------------------+

