# CS 416 Phase II - PySpark Cleaning of the Dish.name

### Configure access to the files stored in our Azure Blob Container

In [0]:
storage_account_name = "stuscdsprdbia"
storage_account_access_key = ""
spark.conf.set("fs.azure.account.key."+storage_account_name+".blob.core.windows.net", storage_account_access_key)
blob_container = "test-data"
file_path = "wasbs://" + blob_container + "@" + storage_account_name + ".blob.core.windows.net/menus/" 

### Load the .csv files into Spark DataFrames

In [0]:
#menu_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(file_path + "Menu.csv")
#menu_page_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(file_path + "MenuPage.csv")
menu_item_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(file_path + "MenuItem.csv")
dish_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(file_path + "Dish.csv")

### Create temp views so that we can query the data in SQL

In [0]:
#menu_df.createOrReplaceTempView("menu")
#menu_page_df.createOrReplaceTempView("menu_page")
menu_item_df.createOrReplaceTempView("menu_item")
dish_df.createOrReplaceTempView("dish")


### Remove any Dish records not found in MenuItem. Also exclude any Dish records with a blank or NULL name.

In [0]:
dish_df_existing = spark.sql("SELECT * FROM dish WHERE EXISTS (SELECT 1 FROM menu_item WHERE dish.id = menu_item.dish_id) AND dish.name IS NOT NULL AND TRIM(dish.name) <> ''")

### Remove non-alpha characters and multiple spaces

In [0]:
from pyspark.sql.functions import regexp_replace
dish_df_alpha = dish_df_existing.withColumn("name_alpha", regexp_replace("name", r"[^a-zA-Z\s]", ""))
dish_df_alpha = dish_df_alpha.withColumn("name_alpha", regexp_replace("name_alpha", " +", " "))

### Tokenize the alphabetic name

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
tokenizer = Tokenizer(inputCol="name_alpha", outputCol="tokens")
tokenized_df = tokenizer.transform(dish_df_alpha)

### Remove Stop Words in English, Spanish, French, and German, along with our custom list of stop words

In [0]:
from pyspark.sql.functions import concat_ws, col, trim
stopwordList = ["order","cup","restaurant"] # custom stop words
english = StopWordsRemover().getStopWords()
spanish = StopWordsRemover.loadDefaultStopWords('spanish')
french = StopWordsRemover.loadDefaultStopWords('french')
german = StopWordsRemover.loadDefaultStopWords('german')
stopwordList.extend(english)
stopwordList.extend(spanish)
stopwordList.extend(french)
stopwordList.extend(german)
stopwordList = list(set(stopwordList))
remover = StopWordsRemover(inputCol="tokens", outputCol="words", stopWords=stopwordList)
words_df = remover.transform(tokenized_df)
words_df = words_df.withColumn("words_string", trim(concat_ws("", col("words"))))
words_df = words_df.where("words_string <> ''")
words_df.select("name_alpha","tokens","words","words_string").limit(1000).display()

name_alpha,tokens,words,words_string
Oyster cocktail,"List(oyster, cocktail)","List(oyster, cocktail)",oystercocktail
Stewed Tripe with Cream sauce,"List(stewed, tripe, with, cream, sauce)","List(stewed, tripe, cream, sauce)",stewedtripecreamsauce
Cream Cheese,"List(cream, cheese)","List(cream, cheese)",creamcheese
Lamb Chops Broiled Plain,"List(lamb, chops, broiled, plain)","List(lamb, chops, broiled, plain)",lambchopsbroiledplain
New asparagus,"List(new, asparagus)","List(new, asparagus)",newasparagus
Fresh milk,"List(fresh, milk)","List(fresh, milk)",freshmilk
Cup consomme hot or cold,"List(cup, consomme, hot, or, cold)","List(consomme, hot, cold)",consommehotcold
Succotash,List(succotash),List(succotash),succotash
Baltimore broil,"List(baltimore, broil)","List(baltimore, broil)",baltimorebroil
Russia Caviar,"List(russia, caviar)","List(russia, caviar)",russiacaviar


### Create a new dataframe with each word as its own row

In [0]:
from pyspark.sql.functions import explode
exploded_df = words_df.select("id", explode(words_df.words).alias("name")).dropDuplicates()
exploded_df = exploded_df.where("TRIM(name) <> ''")
exploded_df.toPandas().to_csv("/dbfs/user/shaun_n/data/" + "Dish_id_name.csv", index=False)
exploded_df.orderBy("name").limit(1000).display()

id,name
54377,aa
128806,aa
278107,aa
108630,aa
30901,aa
435352,aa
196120,aa
477550,aa
266370,aa
471212,aa


### Create a distinct list of dish tokens that will be sent to OpenRefine for additional cleaning

In [0]:
exploded_df.select("name").dropDuplicates().orderBy("name").toPandas().to_csv("/dbfs/user/shaun_n/data/" + "Dish_name.csv", index=False)
exploded_df.select("name").dropDuplicates().orderBy("name").display()

name
aa
aaaa
aaaaa
aad
aagosta
aal
aalbessen
aalbessenyelei
aalborg
aalbourg


### Copy files to ADLS Gen 2

In [0]:
dbutils.fs.cp ("dbfs:/user/shaun_n/data/Dish_name.csv", file_path + "Dish_name.csv")

Out[20]: True

In [0]:
dbutils.fs.cp ("dbfs:/user/shaun_n/data/Dish_id_name.csv", file_path + "Dish_id_name.csv")

Out[21]: True