# Introducing Snorkel

In this notebook we will use Snorkel to enrich our data such that tags with between 500-2,000 examples will be labeled using weak supervision to produce labels for enough examples to allow us to train an accurate full model that includes these new labels.

More information about Snorkel can be found at [Snorkel.org](https://www.snorkel.org/) :) For a basic introduction to Snorkel, see the [Spam Tutorial](http://syndrome:8888/notebooks/snorkel-tutorials/spam/01_spam_tutorial.ipynb). For an introduction to Multi-Task Learning (MTL), see [Multi-Task Tutorial](http://syndrome:8888/notebooks/snorkel-tutorials/multitask/multitask_tutorial.ipynb).

In [None]:
# Snorkel Introduction

from collections import OrderedDict 
from glob import glob
import os
import sys

import cupy
import dask.dataframe as dd
import numpy as np
import pandas as pd
import pyarrow
import random
import snorkel
import spacy
import tensorflow as tf

# Add parent directory to path
parent_dir = os.path.dirname(os.getcwd())
sys.path.append(parent_dir)

# Make reproducible
random.seed(1337)

# Turn off TensorFlow logging messages
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

# For reproducibility
os.environ["PYTHONHASHSEED"] = "1337"

# Show wide columns
pd.set_option('display.max_colwidth', 300)

In [4]:
TAG_LIMIT = 50

In [5]:
PATHS = {
    'questions': {
        'local': '../data/stackoverflow/Questions.Tags.{}.parquet',
        's3': 's3://stackoverflow-events/08-05-2019/Questions.Tags.{}.parquet',
    },
}

# Define a set of paths for each step for local and S3
PATH_SET = 'local' # 's3'

path = PATHS['questions'][PATH_SET].format(TAG_LIMIT)

## Loading our Examples for Augmentation in Pandas

In [4]:
df = df.read_parquet(
    path, 
    engine='pyarrow',
)
df.head()

## Loading our Examples for Augmentation in PySpark

In [6]:
from pyspark import SparkContext
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Programming Language Extraction Example") \
    .config('spark.dynamicAllocation.enabled', True) \
    .config('spark.shuffle.service.enabled', True) \
    .getOrCreate()
sc = spark.sparkContext

path = PATHS['questions']['local'].format(TAG_LIMIT)

question_df = spark.read.parquet(path)
question_df.limit(3).toPandas()

Unnamed: 0,_PostId,_AcceptedAnswerId,_Body,_Code,_Tags,_Label,_AnswerCount,_CommentCount,_FavoriteCount,_OwnerUserId,...,_AccountId,_UserId,_UserDisplayName,_UserDownVotes,_UserLocation,_ProfileImageUrl,_UserReputation,_UserUpVotes,_UserViews,_UserWebsiteUrl
0,415580,415635,Regex Named Groups in Java It is my understanding that the package does not have support for named groups ( so can anyone point me towards a third-party library that does?\nI've looked at jregex but its last release was in 2002 and it didn't work for me (admittedly I only tried briefly) under ...,java.regex,"[java, regex]",0,6,2,46,444,...,355,444,Dan,48,,,2767,298,255,
1,2373579,2373869,Parsing StarTeam command line client output I'm trying to write a Perl script that will parse the output of the stcmd.exe (the StarTeam command line client) hist command. I'm getting the history for every file in a view and the output looks something like this:\nFolder: The View Name (working d...,# $hist contains the stcmd output in the format above\nwhile($hist =~ /History for: (?<filename>.)/s)\n{\n # Record filename somewhere with $+{filename}\n\n while($hist =~ /^Revision: (?<file_rev>\S+) View: (?<view_name>.+) Branch Revision: (?<branch_rev>\S+).\nAuthor: (?<author>.*) Date: ...,"[regex, perl, starteam]",0,4,0,0,510,...,402,510,Evan Shaw,11,"Auckland, New Zealand",,17252,174,1078,
2,5346913,5347047,"Calling VirtualProtect on a mapped file I'm using the CreateFileMapping and MapViewOfFile functions to map a file into memory. After a certain point, I call VirtualProtect to change its protection from read-only to read and write. This call fails and GetLastError gives ERROR_INVALID_PARAMETER.\n...","#include <stdio.h>\n#include <stdlib.h>\n#include <windows.h>\n\nint main() {\n HANDLE fd, md;\n char *addr;\n DWORD old;\n BOOL ok;\n\n fd = CreateFile(""filename"", GENERIC_READ|GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);\n md = CreateFileMapping(fd, NU...","[c, windows, winapi]",0,3,2,2,510,...,402,510,Evan Shaw,11,"Auckland, New Zealand",,17252,174,1078,


## Enable spaCy GPU Support

That is, if you have a GPU and are using Pandas. PySpark can't use a GPU yet.

In [7]:
# spacy.prefer_gpu()

## Create a spaCy [`Language`](https://spacy.io/api/language) Model

In [9]:
from spacy.pipeline import merge_entities

# Download the spaCy english model
spacy.cli.download('en_core_web_lg')
nlp = spacy.load("en_core_web_lg", disable=["vectors"])

# Merge multi-token entities together
nlp.add_pipe(merge_entities)

[38;5;2m✔ Download and installation successful[0m
You can now load the model via spacy.load('en_core_web_lg')


## Exploring spaCy

Below we use print statements and the visualization tool  `spacy.displacy` to render parsed objects for an example document. First we iterate the spaCy  [`Token`s](https://spacy.io/api/token) that make up the [`Doc`](https://spacy.io/api/doc) and print the text and the string defining their part-of-speech. 

We’ll be using these parts of speech to write Labeling Functions using spaCy pattern matching. It will be very useful to know the part of speech we’re looking for in our entities - almost exclusively proper nouns - `PROPN` - and quite often of the pattern `VERB-ADP-PROPN`, which we’ll see below.

Next we use `spacy.displacy` to visualize the parse tree of dependencies between words as well as the entities detected in the sentence. This gives a rough idea of the structure that a spaCy `Doc` has for us to use. It also creates dense vectors based on embeddings for the words and the entire document, which we can use to create LFs as well.

In [10]:
from spacy import displacy

s = 'The program to do payroll was written in C++ and Perl.'
d = nlp(s)
tups = []
for t in d:
    tups.append((t.text, t.pos_))

# Print words/parts-of-speech
print([x for x in tups])

# Render image diagrams
displacy.render(d, style='dep', options={'compact': True, 'collapse_punct': True, 'distance': 90}, )
displacy.render(d, style='ent')

[('The', 'DET'), ('program', 'NOUN'), ('to', 'PART'), ('do', 'AUX'), ('payroll', 'NOUN'), ('was', 'AUX'), ('written', 'VERB'), ('in', 'ADP'), ('C++', 'PROPN'), ('and', 'CCONJ'), ('Perl', 'PROPN'), ('.', 'PUNCT')]


## In Pandas, produce records with their left/right tokens for all entities in all documents.

In [None]:
#
# Pandas - 
#
window = 5
candidates = []
for index, row in df.iterrows():
    doc = nlp(row['_Body'])
    re_doc_1 = nlp(row['body'])
    re_doc_2 = nlp(row['body'])
    
    for ent in doc.ents:
        rec = {}
        rec['body'] = doc.text
        rec['entity'] = ent
        rec['entity_text'] = ent.text
        rec['entity_start'] = ent.start
        rec['entity_end'] = ent.end
        rec['ent_type'] = ent.label_

        left_token_start = max(0, ent.start - 1 - window)
        left_token_end = ent.start
        rec['left_tokens_text'] = [x.text for x in doc[left_token_start : left_token_end]]
        rec['left_text'] = re_doc_1[left_token_start : left_token_end].merge()

        right_token_start = min(ent.end, len(doc) - 1)
        right_token_end = min(ent.end + window, len(doc) - 1)
        rec['right_tokens_text'] = [x.text for x in doc[right_token_start : right_token_end]]
        rec['right_text'] = re_doc_2[right_token_start : right_token_end].merge()

        rec['wikidata_id'] = ent.kb_id
        
        rec['original_index'] = index
        rec['label'] = 0

        candidates.append(rec)

df_out = pd.DataFrame(candidates)
df_out = df_out.reindex().sort_index()

df_out.head()

## In PySpark, produce records with their left/right tokens for all entities in all documents.

In [None]:
from spacy.pipeline import merge_entities

def prepare_docs(rows, window: int=5):
    
    nlp = spacy.load("en_core_web_lg", disable=["vectors"])
    nlp.add_pipe(merge_entities)
    
    recs = []
    
    for row in rows:
        
        doc = nlp(row._Body)
        re_doc_1 = nlp(row._Body)
        re_doc_2 = nlp(row._Body)
        
        for ent in doc.ents:
            rec = {}
            rec['body'] = doc.text

            rec['entity_text'] = ent.text
            rec['entity_start'] = ent.start
            rec['entity_end'] = ent.end
            rec['ent_type'] = ent.label_

            left_token_start = max(0, ent.start - 1 - window)
            left_token_end = ent.start
            rec['left_tokens_text'] = [x.text for x in doc[left_token_start : left_token_end]]

            left_merged_token = re_doc_1[left_token_start: left_token_end].merge()
            rec['left_text'] = left_merged_token.text if left_merged_token else ''
            del left_token_start
            del left_token_end
            
            right_token_start = min(ent.end, len(doc) - 1)
            right_token_end = min(ent.end + window, len(doc) - 1)
            rec['right_tokens_text'] = [x.text for x in doc[right_token_start : right_token_end]]
            
            right_merged_token = re_doc_2[right_token_start: right_token_end].merge()
            rec['right_text'] = right_merged_token.text if right_merged_token else ''
            del right_token_start
            del right_token_end
            
            rec['wikidata_id'] = ent.kb_id
            rec['label'] = 0
            
            recs.append(rec)
            del ent
            
        del doc
        del re_doc_1
        del re_doc_2
    
    return recs


entity_df = question_df.repartition(48).rdd.mapPartitions(prepare_docs)

entity_df = entity_df.sortBy(lambda x: random.random())
entity_df.take(5)

## Load the Gold Labeled data

In [None]:
df_gold = pd.read_csv('../../data/text_extractions.one_file.df_out.gold.labeled.final.csv')

# Drop the index column, we have an index set
df_gold = df_gold.drop(['Unnamed: 0'], axis=1)

df_gold['left_tokens_text'] = df_gold['left_tokens_text'].apply(lambda x: ast.literal_eval(x))
df_gold['right_tokens_text'] = df_gold['right_tokens_text'].apply(lambda x: ast.literal_eval(x))

df_gold.tail()

# Start the rest of the data after the point where the labeled data starts
df_in = df_out.iloc[df_gold.index[-1] + 1:, :]
df_in.head()

## Pandas Split Data into Train / Test Datasets

In [None]:
from sklearn.model_selection import train_test_split

df_train, df_test, y_train, y_test = train_test_split(
    df_in_fixed, 
    df_in_fixed['label'].values, 
    test_size=0.3,
    random_state=1337,
)

len(df_train.index), len(df_test.index), y_train.shape, y_test.shape

## PySpark Split Data into Train / Test Datasets

In [None]:
# Now for PySpark
df_train, df_test = entity_df.randomSplit([0.7, 0.3], seed=1337)

## Define the Labels for Language Extraction

In [None]:
POSITIVE = 1
NEGATIVE = 0
ABSTAIN = -1

## Setup our spaCy Preprocessors

In [None]:
import re
import jsonlines, sys
from snorkel.labeling import labeling_function, LabelingFunction
from snorkel.preprocess import preprocessor
from snorkel.preprocess.nlp import SpacyPreprocessor

spacy = SpacyPreprocessor(
    text_field='body',
    doc_field='spacy',
    memoize=True,
    language='en_core_web_lg',
    disable=['vectors'],
)

@preprocessor(memoize=True, pre=[spacy])
def restore_entity(x):
    
    entity = None
    for ent in x['spacy'].ents:
        if  ent.start == row['entity_start'] \
        and ent.end   == row['entity_end']:
            entity = ent

    if entity is None:
        raise Exception('Missing entity!')

    x['entity'] = entity
    return x

In [None]:
starts_rx = re.compile('^\W')
          
@labeling_function()
def lf_starts_with_char(x):
    """NEGATIVE if starts with a non-alpha-numeric value"""
    return NEGATIVE if starts_rx.match(x['entity_text']) else ABSTAIN


number_end_rx = re.compile('^[a-zA-Z]+[0-9\W]+$')

@labeling_function()
def lf_ends_with_symbol_or_number(x):
    """POSITIVE if starts with letter and ends in number"""
    return POSITIVE if number_end_rx.match(x['entity_text']) else ABSTAIN

@labeling_function()
def lf_wrong_entity_type(x):
    return NEGATIVE if x['ent_type'] in ['PERSON', 'NORP', 'FAC', 'GPE', 'LOC', 
                                         'LAW', 'DATE', 'TIME', 'PERCENT',
                                         'MONEY', 'QUANTITY', 'ORDINAL', 'CARDINAL',] else ABSTAIN

@labeling_function()
def lf_token_count_2(x):
    """NEGATIVE if entity has more than 2 words in it"""
    return NEGATIVE if len(x['entity_text'].split(' ')) > 2 else ABSTAIN

@labeling_function()
def lf_token_count_1(x):
    """NEGATIVE if entity has more than 1 word in it"""
    return NEGATIVE if len(x['entity_text'].split(' ')) > 1 else ABSTAIN

In [None]:
from spacy.matcher import Matcher
matcher = Matcher(nlp.vocab)
pattern = [{'POS': 'VERB'}, {'POS': 'ADP'}, {'POS': 'PROPN'}]
matcher.add("VERB_ADP_PROPN", None, pattern)

@labeling_function(pre=[spacy, restore_entity])
def lf_verb_in_noun(x):
    """Return positive if the pattern"""
    sp = x['spacy']
    matches = matcher(sp)
    
    found = False
    for match_id, start, end in matches:
        if end == x['entity_end']:
            pass
        if start == x['start'] - 2:            
            if sp[start].text in ['work', 'written', 'wrote']:                
                if sp[start + 1].text in ['in']:
                    return POSITIVE
    else:
        return ABSTAIN