# Set up environment

In [4]:
!pip install pyspark



In [5]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import pyspark

number_cores = int(os.environ['NUM_CPUS'])
memory_gb = int(os.environ['AVAILABLE_MEMORY_MB']) // 1024
conf = (
    pyspark.SparkConf()
        .setMaster('local[{}]'.format(number_cores))
        .set('spark.driver.memory', '{}g'.format(memory_gb))
)
sc = pyspark.SparkContext(conf=conf)

In [6]:
print(sc)

<SparkContext master=local[4] appName=pyspark-shell>


In [7]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Load dataset

In [8]:
# Load the dataset
indian = sqlContext.read.csv("/project/Project/DataEngineeringGroupAO/Recipe_dataset/data_indian.csv",header=True)
italian = sqlContext.read.csv("/project/Project/DataEngineeringGroupAO/Recipe_dataset/data_italian.csv",header=True)
mexican = sqlContext.read.csv("/project/Project/DataEngineeringGroupAO/Recipe_dataset/data_mexican.csv",header=True)
new_recipe = sqlContext.read.csv("/project/Project/DataEngineeringGroupAO/Recipe_dataset/final_scrape_not_cleaned.csv",header=True)

In [9]:
# Label the data
from pyspark.sql.functions import lit

indian = indian.withColumn("label",lit("indian"))
italian = italian.withColumn("label",lit("italian"))
mexican = mexican.withColumn("label",lit("mexican"))

In [10]:
# Combine all datasets into one

from functools import reduce
from pyspark.sql import DataFrame

def unionAll(dfs):
    return reduce(DataFrame.unionAll, dfs)

dfs = [indian, italian, mexican, new_recipe]
recipe = unionAll(dfs)
recipe.show()

+--------------------+--------------------+------+
|               Title|         Description| label|
+--------------------+--------------------+------+
|  Indian Peanut Stew|This is an easy, ...|indian|
|        Roomali Roti|There is no leave...|indian|
|Spicy Sweet Potat...|It's important to...|indian|
|        Chicken Saag|The classic India...|indian|
|Paleo Slow Cooker...|Boneless pork loi...|indian|
|Bombay Chicken an...|Chicken parts are...|indian|
|Indian Carrots, P...|Potatoes, peas an...|indian|
|Wendy's Indian Bu...|This recipe resem...|indian|
|    Indian Chickpeas|Garbanzo beans, o...|indian|
|Dal Makhani (Indi...|These richly spic...|indian|
|               Raita|Chopped tomatoes ...|indian|
|Yogurt-Marinated ...|A yogurt-based ma...|indian|
|Indian-Spiced Roa...|Spicy roasted chi...|indian|
|Cauliflower and T...|Pressed tofu cube...|indian|
|Channa Masala (Ch...|This fantastic In...|indian|
|Bengali Chicken C...|Thy this deliciou...|indian|
|  Indian Sweet Bread|A crisp a

In [11]:
# Size before removing duplicates
print(recipe.count())

3840


In [12]:
# Remove duplicates
recipe = recipe.dropDuplicates(["Title"])

In [13]:
# Size after dropping duplicates
print(recipe.count())

3793


# Data Cleaning

In [14]:
# import all packages needed for data cleaning

from pyspark.sql.functions import udf, regexp_replace, lower, col
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.types import IntegerType, ArrayType, StringType

In [15]:
# Lowercase

recipe = recipe.select(*[lower(col(col_name)).name(col_name) for col_name in recipe.columns])
recipe.show()

+--------------------+--------------------+-------+
|               Title|         Description|  label|
+--------------------+--------------------+-------+
|amanda & graham's...|restaurant in our...|italian|
|avocado and mango...|a wonderfully fru...|mexican|
|batman's best cae...|fast, easy, cream...|italian|
|beef chilli with ...|this is a hearty ...|mexican|
|chicken enchilada...|chicken, sour cre...|mexican|
|chorizo breakfast...|a yummy breakfast...|mexican|
|coconut-coriander...|simple preparatio...| indian|
|coriander chicken...|this chicken dish...|mexican|
|           curd rice|curd rice, or yog...| indian|
|daz's tomato chee...|this is my favour...|italian|
|easy chicken tikk...|a quick and easy ...| indian|
|four seasons ench...|nice and spicy us...|mexican|
|homemade four che...|fresh pasta fille...|italian|
|homemade paneer c...|paneer is a white...| indian|
|      indian khichri|khichri is a very...| indian|
|               kheer|this is a popular...| indian|
| madras pot

In [16]:
# Remove punctuation and digits

recipe_clean = recipe.select(regexp_replace('Title', "[^a-zA-Z\\s]", "").alias('title'), 
    (regexp_replace('Description', "[^a-zA-Z\\s]", "").alias('des')),'label')
recipe_clean.show()

+--------------------+--------------------+-------+
|               title|                 des|  label|
+--------------------+--------------------+-------+
|amanda  grahams m...|restaurant in our...|italian|
|avocado and mango...|a wonderfully fru...|mexican|
|batmans best caes...|fast easy creamy ...|italian|
|beef chilli with ...|this is a hearty ...|mexican|
|chicken enchilada...|chicken sour crea...|mexican|
|chorizo breakfast...|a yummy breakfast...|mexican|
|coconutcoriander ...|simple preparatio...| indian|
|coriander chicken...|this chicken dish...|mexican|
|           curd rice|curd rice or yogh...| indian|
|dazs tomato chees...|this is my favour...|italian|
|easy chicken tikk...|a quick and easy ...| indian|
|four seasons ench...|nice and spicy us...|mexican|
|homemade four che...|fresh pasta fille...|italian|
|homemade paneer c...|paneer is a white...| indian|
|      indian khichri|khichri is a very...| indian|
|               kheer|this is a popular...| indian|
| madras pot

In [17]:
# Remove Stopwords

# Tokenize text
tokenizer = Tokenizer(inputCol="des", outputCol="des_token")
recipe = tokenizer.transform(recipe_clean).select('title','des','des_token','label')
# tokenized.select("Description", "Des_words")\
    #.withColumn("tokens", countTokens(col("Des_words"))).show(truncate=False)

# Remove stopwords
remover = StopWordsRemover(inputCol='des_token', outputCol='des_clean')
recipe_no_stopw = remover.transform(recipe).select('title','des_clean', 'label')
recipe_no_stopw.show()

+--------------------+--------------------+-------+
|               title|           des_clean|  label|
+--------------------+--------------------+-------+
|amanda  grahams m...|[restaurant, livi...|italian|
|avocado and mango...|[wonderfully, fru...|mexican|
|batmans best caes...|[fast, easy, crea...|italian|
|beef chilli with ...|[hearty, deliciou...|mexican|
|chicken enchilada...|[chicken, sour, c...|mexican|
|chorizo breakfast...|[yummy, breakfast...|mexican|
|coconutcoriander ...|[simple, preparat...| indian|
|coriander chicken...|[chicken, dish, d...|mexican|
|           curd rice|[curd, rice, yogh...| indian|
|dazs tomato chees...|[favourite, start...|italian|
|easy chicken tikk...|[quick, easy, cur...| indian|
|four seasons ench...|[nice, spicy, use...|mexican|
|homemade four che...|[fresh, pasta, fi...|italian|
|homemade paneer c...|[paneer, white, c...| indian|
|      indian khichri|[khichri, popular...| indian|
|               kheer|[popular, pakista...| indian|
| madras pot

In [18]:
# Stem text
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
recipe = recipe_no_stopw.withColumn("des_stemmed", stemmer_udf("des_clean")).select('title', 'des_stemmed','label')
recipe.show()

+--------------------+--------------------+-------+
|               title|         des_stemmed|  label|
+--------------------+--------------------+-------+
|amanda  grahams m...|[restaur, live, r...|italian|
|avocado and mango...|[wonder, fruiti, ...|mexican|
|batmans best caes...|[fast, easi, crea...|italian|
|beef chilli with ...|[hearti, delici, ...|mexican|
|chicken enchilada...|[chicken, sour, c...|mexican|
|chorizo breakfast...|[yummi, breakfast...|mexican|
|coconutcoriander ...|[simpl, prepar, i...| indian|
|coriander chicken...|[chicken, dish, d...|mexican|
|           curd rice|[curd, rice, yogh...| indian|
|dazs tomato chees...|[favourit, starte...|italian|
|easy chicken tikk...|[quick, easi, cur...| indian|
|four seasons ench...|[nice, spici, use...|mexican|
|homemade four che...|[fresh, pasta, fi...|italian|
|homemade paneer c...|[paneer, white, c...| indian|
|      indian khichri|[khichri, popular...| indian|
|               kheer|[popular, pakista...| indian|
| madras pot

# Pattern Exploration

In [19]:
# Filter out different recipes
# Create temp table
recipe.createOrReplaceTempView('recipes')

recipe_ind = sqlContext.sql("SELECT * FROM recipes WHERE label == 'indian'")
recipe_ita = sqlContext.sql("SELECT * FROM recipes WHERE label == 'italian'")
recipe_mex = sqlContext.sql("SELECT * FROM recipes WHERE label == 'mexican'")

In [20]:
# We asssume that we do not know labels for the majority of data points, 
# hence further explore only test split
recipe_ind_tr, recipe_ind_ts, recipe_ind_dv = recipe_ind.randomSplit([0.8,0.2,0.1],seed = 11)
recipe_ita_tr, recipe_ita_ts, recipe_ita_dv = recipe_ita.randomSplit([0.7,0.2,0.1],seed = 11)
recipe_mex_tr, recipe_mex_ts, recipe_mex_dv = recipe_mex.randomSplit([0.7,0.2,0.1],seed = 11)

In [21]:
# Create frequency list of description
import pyspark.sql.functions as f

top_n = 20

ind_counts = recipe_ind_dv.select(f.explode('des_stemmed').alias('col')).groupBy('col').count()
ind_des_freq = ind_counts.orderBy(ind_counts["count"].desc()).limit(top_n)

ita_counts = recipe_ita_dv.select(f.explode('des_stemmed').alias('col')).groupBy('col').count()
ita_des_freq = ita_counts.orderBy(ita_counts["count"].desc()).limit(top_n)

mex_counts = recipe_mex_dv.select(f.explode('des_stemmed').alias('col')).groupBy('col').count()
mex_des_freq = mex_counts.orderBy(mex_counts["count"].desc()).limit(top_n)

In [22]:
# View then in one dataframe
from pyspark.sql.functions import monotonically_increasing_id 

df1 = ind_des_freq.withColumn("row_id", monotonically_increasing_id())
df2 = ita_des_freq.withColumn("row_id", monotonically_increasing_id())
df3 = mex_des_freq.withColumn("row_id", monotonically_increasing_id())

des_freq = df1.join(df2,("row_id")).join(df3,("row_id")).drop("row_id")
des_freq.show()

+----------+-----+-------+-----+--------+-----+
|       col|count|    col|count|     col|count|
+----------+-----+-------+-----+--------+-----+
|     curri|   57|   sauc|   30| mexican|   27|
|    indian|   36| tomato|   27|   chees|   26|
|      serv|   34|italian|   26|    dish|   25|
|      rice|   29|   dish|   25|  chilli|   20|
|   chicken|   29|  pasta|   25|    make|   20|
|      dish|   28| delici|   23|  delici|   20|
|     spice|   26|   serv|   21| chicken|   20|
|     recip|   26|   make|   20|    easi|   19|
|      make|   23|  recip|   18|   recip|   18|
|      made|   23|       |   18|tortilla|   18|
|      cook|   17|   easi|   16|  tomato|   17|
|    delici|   16|  fresh|   15|    serv|   17|
|       use|   15| garlic|   15|    bean|   16|
|      easi|   15|   made|   15|    sauc|   15|
|     fresh|   14|   bake|   15|     top|   15|
|      tast|   13|  bread|   14|   fresh|   13|
|      sauc|   12|flavour|   13|   spici|   13|
|     great|   11|  chees|   13|    rice

# Data Splitting

In [23]:
from pyspark.sql.functions import concat_ws

# train split full / remove lists
df_tr = recipe_ind_tr.union(recipe_ita_tr)
df_tr = df_tr.union(recipe_mex_tr).withColumn("des_stemmed", concat_ws(" ", "des_stemmed"))

# test split 
df_ts = recipe_ind_ts.union(recipe_ita_ts)
df_ts = df_ts.union(recipe_mex_ts).withColumn("des_stemmed", concat_ws(" ", "des_stemmed"))

# dev/val split
df_dv = recipe_ind_dv.union(recipe_ita_dv)
df_dv = df_dv.union(recipe_mex_dv).withColumn("des_stemmed", concat_ws(" ", "des_stemmed"))

In [24]:
# drop train split for labelling
df_tr.drop('label').first()

Row(title='coconutcoriander chutney', des_stemmed='simpl prepar incred flavour take minut prepar serv fresh chutney curri')

In [25]:
mapping = {'indian':0, 'italian':1, 'mexican':2}

In [26]:
from pyspark.sql.functions import create_map
from itertools import chain

mapping_func = create_map([lit(x) for x in chain(*mapping.items())])

df_dv = df_dv.withColumn("num_label", mapping_func.getItem(col("label")))
# df_dv = df_dv.select('title','des_clean', 'num_label')
df_dv = df_dv.select('title','des_stemmed', 'num_label')
df_dv.show()

+--------------------+--------------------+---------+
|               title|         des_stemmed|num_label|
+--------------------+--------------------+---------+
|    easy prawn curry|homestyl south in...|        0|
|tomato and pea curry|first saut onion ...|        0|
|homestyle chicken...|chicken curri mad...|        0|
|spinach dal palak...|yummi authent dal...|        0|
|   easy carrot halwa|carrot halwa swee...|        0|
|  spicy veg fritters|healthi snack enj...|        0|
|indianstyle chick...|chicken curri sea...|        0|
|       spicy brinjal|simpli yummi  ind...|        0|
|spiced turnips sh...|turnip simmer spi...|        0|
|keertis karwari p...|prawn prepar made...|        0|
|mixed vegetable b...|vegetarian onepot...|        0|
|     yoghurt pudding|simpl healthi del...|        0|
|      butter chicken|fantast version n...|        0|
|        carrot kheer|kheer south asian...|        0|
|saar indian tomat...|tomato soup readi...|        0|
|sweet and spicy a...|wonder

In [27]:
print((df_dv.count(), len(df_dv.columns)))

(341, 3)


In [28]:
# create column with actual values 

import numpy as np

Y_dv = df_dv.select('num_label').rdd.flatMap(lambda x: x).collect()
Y_dv = np.array(Y_dv)

np.shape(Y_dv)

(341,)

# Keywords LFs

In [38]:
# For clarity, we define constants to represent the class labels and abstaining.
ABSTAIN = -1
INDIAN = 0
ITALIAN = 1
MEXICAN = 2

In [32]:
! pip install snorkel
from snorkel.labeling.apply.spark import SparkLFApplier
from snorkel.labeling import LFAnalysis
from snorkel.labeling import labeling_function
import re

Collecting scikit-learn<0.22.0,>=0.20.2
  Using cached scikit_learn-0.21.3-cp36-cp36m-manylinux1_x86_64.whl (6.7 MB)
Installing collected packages: scikit-learn
  Attempting uninstall: scikit-learn
    Found existing installation: scikit-learn 0.22.1
    Uninstalling scikit-learn-0.22.1:
      Successfully uninstalled scikit-learn-0.22.1
Successfully installed scikit-learn-0.21.3


## Indian LFs

In [42]:
ind_keywords = ['curry','indian','masala','paneer','chutney','curried',
                'simmered','cumin','yogurt','coconut']

@labeling_function()
def indian_keywords(x):
        if any(word in x.title for word in ind_keywords):
            return INDIAN
        else:
            return ABSTAIN

In [43]:
# Word Combo curry + meat
@labeling_function()
def currymeat(x):
    return INDIAN if re.search(r"(?=.*curry)(?=.*(chicken|lamb|beef))", x.des_clean, flags=re.I) else ABSTAIN

In [44]:
# cooking process + food name
@labeling_function()
def cook_food(x):
    return INDIAN if re.search(r"(?=.*(quick|easy))(?=.*(rice|sauce|potatoes))", x.des_clean, flags=re.I) else ABSTAIN

In [45]:
# Word Combo Sweet + Spicy

@labeling_function()
def sweet_spicy(x):
    return INDIAN if re.search(r"(?=.*sweet)(?=.*(spicy))", x.des_clean, flags=re.I) else ABSTAIN

In [46]:
# Word Combo Slow + Cook

@labeling_function()
def slow_cook(x):
    return INDIAN if re.search(r"(?=.*slow)(?=.*(cook))", x.des_clean, flags=re.I) else ABSTAIN

## Italian LFs

In [None]:
ita_keywords = ['pasta','mozzarella', 'lasagna','pesto','dente', 'pizza']

@labeling_function()
def italian_keywords(x):
        if any(word in x.title for word in ita_keywords):
            return ITALIAN
        else:
            return ABSTAIN

In [None]:
@labeling_function()
def pasta_with(x):
    return ITALIAN if re.search(r"(?=.*pasta)(?=.*(chicken|lamb|beef|pesto|creamy|shrimps|cheese))", x.des_clean, flags=re.I) else ABSTAIN

In [None]:
@labeling_function()
def sundried_tomatoes(x):
    return ITALIAN if re.search(r"(?=.*tomatoes)(?=.*(sun-dried|sundried))", x.des_clean, flags=re.I) else ABSTAIN

In [None]:
ita_regions = ['tuscan','sicilian', 'romano', 'romaine', 'mediterranean','meditterranean' ]

@labeling_function()
def ita_regions(x):
        if any(word in x.title for word in ita_keywords):
            return ITALIAN
        else:
            return ABSTAIN

## Mexican LFs

In [33]:
mex_keywords = ['chees','tortilla','corn','beans','salsa','chilli','spici','onion','chip','simmer']

@labeling_function()
def mexican_keywords(x):
        if any(word in x.title for word in mex_keywords):
            return MEXICAN
        else:
            return ABSTAIN

In [48]:
# Word Combo tortilla + corn
@labeling_function()
def tortilla_corn(x):
    return MEXICAN if re.search(r"(?=.*tortilla)(?=.*(corn))", x.des_clean, flags=re.I) else ABSTAIN

In [49]:
# salsa + food
@labeling_function()
def salsa_food(x):
    return MEXICAN if re.search(r"(?=.*(bean))(?=.*(chilli|sauc|soup))", x.des_clean, flags=re.I) else ABSTAIN

In [50]:
# chip
@labeling_function()
def chip(x):
    return MEXICAN if re.search(r"(?=.*(chip))(?=.*(chicken|shrimp|chillies|peppers|sauce))", x.des_clean, flags=re.I) else ABSTAIN

In [55]:
!pip install Py4j



In [57]:
df_tr_rdd = df_tr.rdd
df_dv_rdd = df_dv.rdd

lfs = [mexican_keywords, tortilla_corn, salsa_food,chip]

applier = SparkLFApplier(lfs=lfs)
L_train = applier.apply(df_tr_rdd)
L_dev = applier.apply(df_dv_rdd)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 74.0 failed 1 times, most recent failure: Lost task 2.0 in stage 74.0 (TID 7558, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'des_clean' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/spark.py", line 39, in map_fn
    return apply_lfs_to_data_point(*args, lfs=self._lfs, f_caller=f_caller)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/core.py", line 107, in apply_lfs_to_data_point
    y = f_caller(lf, x)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/core.py", line 28, in __call__
    return f(x)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/lf/core.py", line 77, in __call__
    return self._f(x, **self._resources)
  File "<ipython-input-48-ca314507f512>", line 4, in tortilla_corn
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
    raise AttributeError(item)
AttributeError: des_clean

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'des_clean' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/spark.py", line 39, in map_fn
    return apply_lfs_to_data_point(*args, lfs=self._lfs, f_caller=f_caller)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/core.py", line 107, in apply_lfs_to_data_point
    y = f_caller(lf, x)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/core.py", line 28, in __call__
    return f(x)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/lf/core.py", line 77, in __call__
    return self._f(x, **self._resources)
  File "<ipython-input-48-ca314507f512>", line 4, in tortilla_corn
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
    raise AttributeError(item)
AttributeError: des_clean

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
from snorkel.labeling import LFAnalysis

LFAnalysis(L=L_train, lfs=lfs).lf_summary()

In [47]:
df_tr_rdd = df_tr.rdd
df_dv_rdd = df_dv.rdd

lfs = [indian_keywords, currymeat, cook_food]

applier = SparkLFApplier(lfs=lfs)
L_train = applier.apply(df_tr_rdd)
L_dev = applier.apply(df_dv_rdd)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 62.0 failed 1 times, most recent failure: Lost task 2.0 in stage 62.0 (TID 5741, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'des_clean' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/spark.py", line 39, in map_fn
    return apply_lfs_to_data_point(*args, lfs=self._lfs, f_caller=f_caller)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/core.py", line 107, in apply_lfs_to_data_point
    y = f_caller(lf, x)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/core.py", line 28, in __call__
    return f(x)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/lf/core.py", line 77, in __call__
    return self._f(x, **self._resources)
  File "<ipython-input-43-6cc09f8abb6f>", line 4, in currymeat
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
    raise AttributeError(item)
AttributeError: des_clean

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'des_clean' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/spark.py", line 39, in map_fn
    return apply_lfs_to_data_point(*args, lfs=self._lfs, f_caller=f_caller)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/core.py", line 107, in apply_lfs_to_data_point
    y = f_caller(lf, x)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/apply/core.py", line 28, in __call__
    return f(x)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/snorkel/labeling/lf/core.py", line 77, in __call__
    return self._f(x, **self._resources)
  File "<ipython-input-43-6cc09f8abb6f>", line 4, in currymeat
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
    raise AttributeError(item)
AttributeError: des_clean

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [84]:
np.shape(L_dev)

(341, 3)

In [85]:
from snorkel.labeling import LFAnalysis

LFAnalysis(L=L_train, lfs=lfs).lf_summary()

Unnamed: 0,j,Polarity,Coverage,Overlaps,Conflicts
indian_keywords,0,[0],0.191667,0.037121,0.0
currymeat,1,[0],0.041288,0.029167,0.0
cook_food,2,[0],0.04053,0.012879,0.0


In [86]:
LFAnalysis(L_dev, lfs=lfs).lf_summary(Y_dv)

Unnamed: 0,j,Polarity,Coverage,Overlaps,Conflicts,Correct,Incorrect,Emp. Acc.
indian_keywords,0,[0],0.158358,0.032258,0.0,53,1,0.981481
currymeat,1,[0],0.038123,0.026393,0.0,13,0,1.0
cook_food,2,[0],0.049853,0.01173,0.0,9,8,0.529412
