In [1]:
# Initialize Spark Environment and Spark SQL
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf

sc = SparkContext()
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Snorkel Crowdsourcing Demo") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()



In [2]:
# Load Crowdsourcing Data
raw_crowd_answers = spark.read.format("csv").option("header", "true").csv("data/weather-non-agg-DFE.csv")
gold_crowd_answers = spark.read.format("csv").option("header", "true").csv("data/weather-evaluated-agg-DFE.csv")

In [3]:
raw_crowd_answers.show()

+---------+---------+------+---------+-------+------+-----+--------------------+--------+--------------------+
|_unit_id_|  channel| trust|worker_id|country|region| city|             emotion|tweet_id|          tweet_body|
+---------+---------+------+---------+-------+------+-----+--------------------+--------+--------------------+
|314960382|clixsense|0.4541| 18034918|    IND|     7|Delhi|Neutral / author ...|82846118|Fire Weather Watc...|
|314960385|clixsense|0.4541| 18034918|    IND|     7|Delhi|            Positive|82510997|Passing out now. ...|
|314960391|clixsense|0.4541| 18034918|    IND|     7|Delhi|            Negative|83271279|"RT @mention: ""T...|
|314960396|clixsense|0.4541| 18034918|    IND|     7|Delhi|            Positive|80058872|It is hot out her...|
|314960400|clixsense|0.4541| 18034918|    IND|     7|Delhi|Neutral / author ...|80058809|I can't find a wa...|
|314960401|clixsense|0.4541| 18034918|    IND|     7|Delhi|Neutral / author ...|79188429|Record LOW humidi...|
|

In [4]:
# Create an empty dataframe to store worker features
field = [StructField("worker_id",StringType(), True),StructField("feature", StringType(), True)]
schema = StructType(field)
worker_features = spark.createDataFrame(sc.emptyRDD(), schema)

In [5]:
# Extract country feature
countryView = raw_crowd_answers.select("worker_id","country").distinct()
name = 'country'
countryUDF = UserDefinedFunction(lambda x: 'country='+x if x else "NULL", StringType())
countryFeatures = countryView.select(*[countryUDF(column).alias('feature') if column == name else column for column in countryView.columns])
worker_features = worker_features.unionAll(countryFeatures)

In [6]:
# Extract channel feature
channelView = raw_crowd_answers.select("worker_id","channel").distinct()
name = 'channel'
channelUDF = UserDefinedFunction(lambda x: 'channel='+x if x else "NULL", StringType())
channelFeatures = channelView.select(*[channelUDF(column).alias('feature') if column == name else column for column in channelView.columns])
worker_features = worker_features.unionAll(channelFeatures)

In [7]:
# Extract city feature
cityView = raw_crowd_answers.select("worker_id","city").distinct()
name = 'city'
cityUDF = UserDefinedFunction(lambda x: 'city='+x if x else "NULL", StringType())
cityFeatures = cityView.select(*[cityUDF(column).alias('feature') if column == name else column for column in cityView.columns])
worker_features = worker_features.unionAll(cityFeatures)

In [8]:
# Extract worker_id canonical feature
workerIDView = raw_crowd_answers.select("worker_id","worker_id").distinct()
name = 'worker_id'
workerIDUDF = UserDefinedFunction(lambda x: 'worker_id='+x if x else "NULL", StringType())
workerIDFeatures = workerIDView.select(*[workerIDUDF(column).alias('feature') if column == name else column for column in workerIDView.columns])
worker_features = worker_features.unionAll(workerIDFeatures)

In [9]:
# Extract ground truth per tweet_id
gold_crowd_answers.createOrReplaceTempView("gold_crowd_answers")
gold_answers = spark.sql("SELECT tweet_id, sentiment FROM gold_crowd_answers WHERE correct_category ='Yes'")

In [10]:
# Extract raw tweets
tweet_body = gold_crowd_answers.select("tweet_id", "tweet_body")

In [11]:
# Extract worker votes
worker_votes = raw_crowd_answers.selectExpr("worker_id", "tweet_id", "emotion as sentiment")

In [12]:
# Save data as parquet files
worker_features.write.parquet("data/worker_features.parquet",mode="overwrite")
gold_answers.write.parquet("data/gold_answers.parquet",mode="overwrite")
tweet_body.write.parquet("data/tweet_body.parquet",mode="overwrite")
worker_votes.write.parquet("data/worker_votes.parquet",mode="overwrite")

In [13]:
# Generate labeling matrix
taskLabels = [False]
taskLabels.extend([str(i.sentiment) for i in worker_votes.select("sentiment").distinct().collect()])
taskLabelsMap = {}
for i in range(len(taskLabels)):
    taskLabelsMap[taskLabels[i]] = i
    
    
# Task map
task2CandMap = {}
cid = 0
cand2TaskMap = []
for task in worker_votes.select("tweet_id").distinct().orderBy("tweet_id").collect():
    task2CandMap[task.tweet_id] = cid
    cand2TaskMap.append(task.tweet_id)
    cid += 1
    
# Workers map
worker2LFMap = {}
lfid = 0
lf2WorkerMap = []
for worker in worker_votes.select("worker_id").distinct().orderBy("worker_id").collect():
    worker2LFMap[worker.worker_id] = lfid
    lf2WorkerMap.append(worker.worker_id)
    lfid += 1

In [14]:
tasks = worker_votes.select("tweet_id").distinct().count()
workers = worker_votes.select("worker_id").distinct().count()
print taskLabelsMap

{False: 0, 'Positive': 3, 'Negative': 4, "I can't tell": 2, 'Neutral / author is just sharing information': 5, 'Tweet not related to weather condition': 1}


In [15]:
# Create labeling matrix
import numpy as np
from scipy import sparse

L = sparse.lil_matrix((tasks, workers), dtype=np.int64)
for observation in worker_votes.select("worker_id", "tweet_id", "sentiment").collect():
    task = task2CandMap[observation.tweet_id]
    worker = worker2LFMap[observation.worker_id]
    label = taskLabelsMap[observation.sentiment]
    L[task, worker] = label


In [27]:
# Train generative model
from snorkel.learning.gen_learning import GenerativeModel, DEP_EXCLUSIVE, DEP_REINFORCING, DEP_FIXING, DEP_SIMILAR

# Init priors
LF_acc_priors = [0.7]*workers

print("Testing init:")
gen_model = GenerativeModel(lf_propensity=False)
gen_model.train(
    L,
    reg_type=2,
    reg_param=0.1,
    epochs=10
)
stats = gen_model.learned_lf_stats()
accs = stats["Accuracy"]

Testing init:
Inferred cardinality: 5


In [17]:
# Get MAP assignment for each task
task_marginals = gen_model.marginals(L)
task_map_assignment = np.argmax(task_marginals, axis=1)
inferredLabels = {}
for i in range(len(task_map_assignment)):
    inferredLabels[cand2TaskMap[i]] =  taskLabels[task_map_assignment[i]+1]

In [18]:
errors = 0
total = float(gold_answers.count())
for trueLabel in gold_answers.select("tweet_id","sentiment").collect():
    if trueLabel.sentiment != inferredLabels[trueLabel.tweet_id]:
        errors += 1
print 'Accuracy = ', (total-errors)/total

Accuracy =  0.980846774194


In [19]:
featurized_worker_votes = worker_votes.join(worker_features, worker_features.worker_id == worker_votes.worker_id).select(worker_votes.worker_id, worker_votes.tweet_id, worker_votes.sentiment, worker_features.feature)

In [20]:
featurized_worker_votes.show()

+---------+--------+--------------------+-----------+
|worker_id|tweet_id|           sentiment|    feature|
+---------+--------+--------------------+-----------+
| 20095709|82675151|Neutral / author ...|country=USA|
| 20095709|79188846|Neutral / author ...|country=USA|
| 20095709|82682363|Neutral / author ...|country=USA|
| 20095709|79194027|Neutral / author ...|country=USA|
| 20095709|82844564|            Positive|country=USA|
| 20095709|82850000|            Negative|country=USA|
| 20095709|80058539|            Positive|country=USA|
| 20095709|80053247|Tweet not related...|country=USA|
| 20095709|82516074|        I can't tell|country=USA|
| 20095709|82841589|Tweet not related...|country=USA|
| 20095709|84308263|            Negative|country=USA|
| 20095709|82677176|Tweet not related...|country=USA|
| 20095709|84048359|            Positive|country=USA|
| 20095709|82514466|Neutral / author ...|country=USA|
| 20095709|82674290|Neutral / author ...|country=USA|
| 20095709|83255088|        

In [21]:
# Generate labeling matrix
taskLabels = [False]
taskLabels.extend([str(i.sentiment) for i in featurized_worker_votes.select("sentiment").distinct().collect()])
taskLabelsMap = {}
for i in range(len(taskLabels)):
    taskLabelsMap[taskLabels[i]] = i
    
    
# Task map
task2CandMap = {}
cid = 0
cand2TaskMap = []
for task in featurized_worker_votes.select("tweet_id").distinct().orderBy("tweet_id").collect():
    task2CandMap[task.tweet_id] = cid
    cand2TaskMap.append(task.tweet_id)
    cid += 1
    
# Workers map
worker2LFMap = {}
lfid = 0
lf2WorkerMap = []
lfFeatures = []
for worker in featurized_worker_votes.select("worker_id","feature").distinct().orderBy("worker_id","feature").collect():
    worker2LFMap[worker.worker_id+"_"+worker.feature] = lfid
    lf2WorkerMap.append(worker.worker_id+"_"+worker.feature)
    lfFeatures.append(worker.feature)
    lfid += 1

In [22]:
tasks = featurized_worker_votes.select("tweet_id").distinct().count()
LFs = featurized_worker_votes.select("worker_id","feature").distinct().count()

In [23]:
# Create labeling matrix
import numpy as np
from scipy import sparse

L = sparse.lil_matrix((tasks, LFs), dtype=np.int64)
for observation in featurized_worker_votes.select("worker_id", "feature", "tweet_id", "sentiment").collect():
    task = task2CandMap[observation.tweet_id]
    lfid = worker2LFMap[observation.worker_id+"_"+observation.feature]
    label = taskLabelsMap[observation.sentiment]
    L[task, lfid] = label


In [24]:
# Train generative model
%load_ext autoreload
%autoreload 2
%matplotlib inline
from snorkel.learning.gen_learning import FeaturizedGenerativeModel, DEP_EXCLUSIVE, DEP_REINFORCING, DEP_FIXING, DEP_SIMILAR

# Init priors
LF_acc_priors = [0.7]*LFs

print("Testing init:")
gen_model = FeaturizedGenerativeModel(lf_propensity=True)
gen_model.train(
    L,
    LF_acc_features=lfFeatures,
    reg_type=2,
    reg_param=0.01,
    epochs=10
)
stats = gen_model.learned_lf_stats()
accs = stats["Accuracy"]

Testing init:
Inferred cardinality: 5


In [25]:
# Get MAP assignment for each task
task_marginals = gen_model.marginals(L)
task_map_assignment = np.argmax(task_marginals, axis=1)
inferredLabels = {}
for i in range(len(task_map_assignment)):
    inferredLabels[cand2TaskMap[i]] =  taskLabels[task_map_assignment[i]+1]

In [26]:
errors = 0
total = float(gold_answers.count())
for trueLabel in gold_answers.select("tweet_id","sentiment").collect():
    if trueLabel.sentiment != inferredLabels[trueLabel.tweet_id]:
        errors += 1
print 'Accuracy = ', (total-errors)/total

Accuracy =  0.975806451613
