In [None]:
####----Check correct working directory----####

import os
wd = os.getcwd()
if(wd != '/home/centos/BigData/02_cleaning/'):
    from os import chdir
    chdir("/home/centos/BigData/02_cleaning/")
    print("Directory changed to: " + wd)
else:
    print("Correct current directory: " + wd)

In [None]:
####----Import dictionary----####
with open('dict/Overall_Master.txt', encoding = 'ISO-8859-1') as file:
    ingredient_set = file.read().splitlines()

print("List length: " +str(len(ingredient_set)))
#print(ingredient_set)
#ingredient_set = ingredient_set[1:10]

In [None]:
####----Connect to Spark via PySpark----####
%pip install pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf, explode, col, lit, monotonically_increasing_id, unix_timestamp
from pyspark.sql.types import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, Normalizer, CountVectorizer
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
import pandas as pd
import time
import datetime
import json
import re
import findspark
print("Connecting Spark")

#spark.close()
sc.stop()


#Settings necessary for sandboxing otherwise Pyspark too slow on standalone
SparkContext.setSystemProperty('spark.executor.memory', '2g')
sc = SparkContext("local", "App Name")
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "50")
sqlContext.setConf("spark.sql.inMemoryColumnarStorage.batchSize", "12000")
#improve toPandas() 
sqlContext.setConf("spark.sql.execution.arrow.enabled", "true")
print("Spark Context Created")


In [None]:
# spark = SparkSession \
#     .builder \
#     .appName("Python Spark SQL basic example") \
#     .config("spark.some.config.option", "some-value") \
#     .getOrCreate()
# spark.conf.set("spark.executor.memory", "2g")
# print("Spark Session Created")
#Python Java issues: a drawback in using pyspark instead of Scala

In [None]:
#Creation of filtering words
with open("dict/Overall_Master.txt") as file:
    ingredient_set = file.read().splitlines()
print("List length: " + str(len(ingredient_set)))

In [None]:
####----Load json into Spark for data cleaning----####
read_path = "/home/centos/BigData/01_source/bbc*.json"
#read_path = "hdfs://localhost:9000/01_raw/allrecipes*.json"

recipe_j = sqlContext.read.json(read_path)
recipe_j.printSchema()


In [None]:
####----Create Spark DataFrame----####

##Create temporary view to transform to dataframes
recipe_j.createOrReplaceTempView("recipe")

##Create Spark dataframe
recipe_df = sqlContext.sql("SELECT title, description, ingredients, instructions, rating_stars, review_count, photo_url, total_time_minutes, url FROM recipe LIMIT 100")
recipe_df.printSchema()

recipe_df.show(5)

##Generate into sample
# small_sample = recipe_sdf.toPandas()
# small_sample.to_csv("small_sample.csv")

#Noted cook_time_minutes have quite a bit of zeroes. Use rating_stars and review_count instead


In [None]:
####----Extract ingredient----####
import re

##User-defined Functions
def removeParenthesis(li):
    output = []
    for text in li:
        text = re.sub(r"\([^)]*\)", "", text)
        text = text.strip()
        output.append(text)
    return output

def extractIngredient(li, ingredient_set):
    output = []
    for item in li:
        temp_output = []
        for set_tracker in range(len(ingredient_set)):
            check = bool(re.search(ingredient_set[set_tracker], item))
            if (check == True):    
                temp_output.append(ingredient_set[set_tracker])
        if (len(temp_output) != 0):
            temp_counter = 0
            temp_tracker = 0
            for temp in range(len(temp_output)):
                count_temp = len(temp_output[temp])
                if (count_temp > temp_counter):
                    temp_tracker = temp
                    temp_counter = count_temp
            output.append(temp_output[temp_tracker])
#             output.append(temp_output[[len(i) for i in temp_output].index(max([len(i) for i in temp_output]))])
    return output

sample = ['1/2 cup unsalted butter, chilled and cubed', '1 cup chopped onion', '1 3/4 cups cornmeal', '1 1/4 cups all-purpose flour', '1/4 cup white sugar', '1 tablespoon baking powder', '1 1/2 teaspoons salt', '1/2 teaspoon baking soda', '1 1/2 cups buttermilk', '3 eggs', '1 1/2 cups shredded pepperjack cheese', '1 1/3 cups frozen corn kernels, thawed and drained', '2 ounces roasted marinated red bell peppers, drained and chopped', '1/2 cup chopped fresh basil']

test = extractIngredient(sample, ingredient_set)
print(test)
print(len(ingredient_set))

##Create user defined function
udf_removeParen = udf(removeParenthesis, ArrayType(StringType()))
udf_extractIngredient = udf(lambda x: extractIngredient(x,ingredient_set), ArrayType(StringType()))
udf_countList = udf(lambda x: len(x), IntegerType())

print("UDF Successful.")


In [None]:
#User Defined Function - Feature Engineering 
#Labels: Vegetarian, Lactose, Nut, Seafood

#Safer to start off as non-vegetarian
def detectVege(li, vegDetect_list):
    label = 0
    detect_list = []
    for text in li:
        if text in vegDetect_list:
            detect_list.append(text)
    if (len(detect_list) == 0):
        label = 1
    return label

#Safer to start off as positive allergy
def detectNut(li):
    label = 1
    detect_list = []
    for text in li:
        if ("nut" in text):
            detect_list.append(text)
    if (len(detect_list) == 0):
        label = 0
    return label

def detectDairy(li):
    label = 1
    dairy_list = ["cheese", "milk", "yoghurt", "cream"]
    detect_list = []
    for text in li:
        if (text in dairy_list):
            detect_list.append(text)
    if (len(detect_list) == 0):
        label = 0
    return label

def detectSeafood(li, seaDetect_list):
    label = 1
    detect_list = []
    for text in li:
        if text in seaDetect_list:
            detect_list.append(text)
    if (len(detect_list) == 0):
        label = 0
    return label


with open("dict/vegDetect.txt") as veg_file:
    vegDetect_list = veg_file.read().splitlines()

with open("dict/seafoodDetect.txt") as seafood_file:
    seaDetect_list = seafood_file.read().splitlines()
    
#create user defined function
udf_detectVege = udf(lambda x: detectVege(x, vegDetect_list), IntegerType())
udf_detectNut = udf(detectNut, IntegerType())
udf_detectDairy = udf(detectDairy, IntegerType())
udf_detectSeafood = udf(lambda x: detectSeafood(x, seaDetect_list), IntegerType())

#to stabilise schema with empty column
# udf_empty = udf(lambda x: None, StringType())

#create variable for time stamp
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')


In [None]:
####----Create ingredient table----####
##Create ingredient table
extracted_df = allbbc_table.withColumn('rm_paren', udf_removeParen('ingredients')) \
                           .withColumn('ingredient_extract', udf_extractIngredient('rm_paren'))

extracted_df.cache()
extracted_df.show(5)
extracted_df.storageLevel
# extracted_df.unpersist()


In [None]:
extracted_df.count()

In [None]:
#Create recipe ingredient graph to store in HDFS
ingredient_extract = extracted_df.select("title", "ingredient_extract")

ingredient_graph = ingredient_extract.withColumn('exploded',explode('ingredient_extract')) \
                                   .select(col('title').alias('Recipe'),col('exploded').alias('Ingredient'))\

ingredient_graph = ingredient_graph.filter(ingredient_graph.Ingredient != "")
#                       .withColumn("Frequency", lit(1))

ingredient_graph.show(5)

In [None]:
####----User Defined Function - Feature Engineering----####
##Labels: Vegetarian, Lactose, Nut, Seafood

labelled_df = extracted_df.withColumn("vegetarian_label", udf_detectVege("ingredient_extract")) \
                          .withColumn("nut_label", udf_detectNut("ingredient_extract")) \
                          .withColumn("lactose", udf_detectDairy("ingredient_extract")) \
                          .withColumn("seafood", udf_detectSeafood("ingredient_extract")) \
                          .withColumn('extract_count', udf_countList('ingredient_extract'))\

labelled_df = labelled_df.withColumn('timestamp', unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

# labelled_df = labelled_df.withColumn("rating_stars", udf_empty("ingredient_extract")) \
#                          .withColumn("review_count", udf_empty("ingredient_extract"))


labelled_df.show(5)

In [None]:
columns_to_drop = ['rm_paren', 'ingredient_extract']
labelled_df = labelled_df.drop(*columns_to_drop)
labelled_df.printSchema()

In [None]:
####----Export Spark Data Frame----####

import re
import time
export_filename = 'recipe_bbc_' + (time.strftime('%Y%m%d-%H%M%S')) + '.csv'


#let's make our toPandas() faster
labelled_pandas = labelled_df.toPandas()
labelled_pandas.to_csv(export_filename)
# graph_pandas = ingredient_graph.toPandas()
# graph_pandas.to_csv("ingredient_graph.csv")

#write json
# labelled_json = labelled_df.toJSON()
# json_output = labelled_json.collect()

# with open("label_table.json", "w") as file:
#     for j in json_output:
#         json.dump(j, file)


####----Upload file into HDFS----####
import os 
from subprocess import PIPE, Popen

##Create path to HDFS
hdfs_path = os.path.join(os.sep, 'user', 'centos', '/02_store/' + export_filename)

##Put files into HDFS
put_file = Popen(["hdfs", "dfs", "-put -f", export_filename, hdfs_path], stdin = PIPE, bufsize=-1)
put_file.communicate()

print('\nUpload completed and file stored in hdfs://localhost:9000' + hdfs_path)

In [None]:
#Create recipe-recipe graph
recipeGraph_json = ingredient_extract.toJSON()
recipeGraph_json = recipeGraph_json.collect()

recipeGraph_list = []
for js in recipeGraph_json:
    js_output = json.loads(js)
    recipeGraph_list.append(js_output)

recipeA = []
recipeB = []
commonIng = []
for first in range(len(recipeGraph_list)):
    counter = 1
    recipe_a = recipeGraph_list[first]["title"]
    ing_a = recipeGraph_list[first]["ingredient_extract"]
    for second in range(counter, len(recipeGraph_list)): 
        recipe_b = recipeGraph_list[second]["title"]
        ing_b = recipeGraph_list[second]["ingredient_extract"]
        common = len(set(ing_a).intersection(ing_b))
        recipeA.append(recipe_a)
        recipeB.append(recipe_b)
        commonIng.append(common)
    counter = counter + 1

recipeGraph_dict = {"recipeA":recipeA, "recipeB":recipeB, "common_ing":commonIng}
sharedRecipe_pandas = pd.DataFrame(recipeGraph_dict)
export_filename = 'recipe_bbc_Ing' + (time.strftime('%Y%m%d-%H%M%S')) + '.csv'

sharedRecipe_pandas.to_csv(export_filename)

####----Upload file into HDFS----####
import os 
from subprocess import PIPE, Popen

##Create path to HDFS
hdfs_path = os.path.join(os.sep, 'user', 'centos', '/02_store/' + export_filename)

##Put files into HDFS
put_file = Popen(["hdfs", "dfs", "-put -f", export_filename, hdfs_path], stdin = PIPE, bufsize=-1)
put_file.communicate()

print('\nUpload completed and file stored in hdfs://localhost:9000' + hdfs_path)