## Workshop Goals

### - Get to know Apache Spark engine.

### - Understand Spacy NLP library capabilities.

### Apache Spark is a fast and general engine for large-scale data processing
![Spark Libs](img/spark-libs.png)

### It can access diverse data sources including HDFS, Cassandra, Hive, HBase, S3 and JDBC/ODBC
![Spark Compatabilities](img/spark-cmp.png)

![Hadoop data sharing](img/data-sharing-mapreduce.png)
![Spark data sharing](img/data-sharing-spark.png)

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as fun, types

import spacy

import pandas as pd
pd.set_option('max_colwidth', 80)

### Spark session init

In [None]:
spark = SparkSession(SparkContext.getOrCreate()) \
    .builder \
    .appName('NLP') \
    .getOrCreate()

### Load dataset

News Category Dataset:
https://www.kaggle.com/rmisra/news-category-dataset

Each json record contains following attributes:

* category: Category article belongs to

* headline: Headline of the article

* authors: Person authored the article

* link: Link to the post

* short_description: Short description of the article

* date: Date the article was published

In [None]:
news_df = spark.read.json("News_Category_Dataset_v2.json")
news_df.show()

### Examples

In [None]:
news_df.createOrReplaceTempView("news")

In [None]:
spark.sql("SELECT COUNT(*) AS count FROM news").show()

In [None]:
news_df.count()

In [None]:
spark.sql("SELECT category, count(category) AS count FROM news GROUP BY category ORDER BY count DESC").show()

In [None]:
news_df.groupby('category') \
    .count() \
    .orderBy(fun.desc('count')) \
    .show()

### Task 1. Select the longest headline

Hint: Use `length` function and `LIMIT` expression in SQL

Available functions: http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

## Spacy NLP library
![Spacy Features](img/spacy-features.png)

## Spacy pipeline
![Spacy Features](img/spacy-pipeline.png)

### Examples

In [None]:
# import spacy 

nlp = spacy.load("en_core_web_sm")
doc = nlp("In 2018 the Debian Linux project received a donation of $300,000")

for token in doc:
    print(token.text)

In [None]:
for token in doc.noun_chunks:
    print(token.text)

In [None]:
for token in doc:
    if token.like_num:
        print(token.text)

### Task 2. Extract named entities from the string

Hint: Use `ents` attribute of the `Doc` and `label_` attribute of the `Token`

Spacy Cheat Sheet: http://datacamp-community-prod.s3.amazonaws.com/29aa28bf-570a-4965-8f54-d6a541ae4e06

## Let's combine a power of these two instruments

### Task 3. Extract ORG, PERSON, GPE named entities in Spark

```
# Write a function that takes a news headline and generate the output like that

[
  {
    'label': 'ORG', 
    'text': 'ACME Inc.'
  },
  {
    'label': 'PERSON', 
    'text': 'John Doe'   
  },
  {
    'label': GPE,
    'text': 'London'
  }
  ...
]
```

In [None]:
news_df_sample = news_df.sample(withReplacement=False, fraction=0.002, seed=777)
news_df_sample.createOrReplaceTempView("news_sample")

In [None]:
class SpacyWrapper(object):
    """Wrapper class to load Spacy on worker nodes"""
    _spacys = {}
    disabled_pipeline_steps = ['parser', 'tagger']
    default_model = 'en_core_web_sm'

    @classmethod
    def get(cls, model=default_model, disable=disabled_pipeline_steps):
        if model not in cls._spacys:
            import spacy
            cls._spacys[model] = spacy.load(model, disable=disable)
        return cls._spacys[model]

### Neamed entity extraction function

Hint: Reuse the code from `Task 2`.

In [None]:
def ner(doc):
    labels=['ORG', 'PERSON', 'GPE']
    entities = []
    
    # Load Spacy
    nlp = SpacyWrapper.get()
    doc = nlp(doc)
    
    # ======== WRITE YOUR SOLUTION BELOW ======== 
        
    return entities

In [None]:
# Schema definition
schema = types.ArrayType(
    types.StructType([
        types.StructField('label', types.StringType(), nullable=False),
        types.StructField('text', types.StringType(), nullable=False)
    ])
)

# Register user defined function (UDF) to use in SQL
spark.udf.register('ner', ner, schema)

### Apply UDF to extract headlines

In [None]:
ent_sample = spark.sql("SELECT short_description, ner(short_description) AS entities FROM news_sample")

In [None]:
ent_sample.toPandas()

## Save output to JSON

In [None]:
spark.sql("SELECT short_description, ner(short_description) AS entities FROM news_sample") \
    .repartition(1) \
    .write \
    .json("output")

In [None]:
# Create classifier box

model = spacy.blank("en")
textcat = model.create_pipe(
    "textcat",
    config={
        "exclusive_classes": True,
        "architecture": "simple_cnn",
    }
)
model.add_pipe(textcat, last=True)

# Get labels & propagate them into the model
labels = [row.category for row in news_df.select(news_df.category).distinct().collect()]
for lbl in labels:
    textcat.add_label(lbl)

In [None]:
# Prepare data box

from pyspark.sql.types import MapType, StringType, BooleanType

SEED = 15

# Splpit data
train_df, test_df = news_df.randomSplit([0.2, 0.8], seed=SEED)

# Broadcast labels to all workers
wlabels = spark.sparkContext.broadcast(labels)

# Create udf to for label to cats conversion
fschema = MapType(StringType(), BooleanType(), False)
def tocats(label):
    return {lbl: (lbl == label) for lbl in wlabels.value}
spark.udf.register('tocats', tocats, fschema)

train_df.createOrReplaceTempView("train_df")
test_df.createOrReplaceTempView("test_df")

prepared_df = spark.sql("SELECT concat_ws('. ', headline, short_description) as text, tocats(category) as cats from train_df")

# Get dataset on the driver for training
train_data = [(row.text, {"cats": row.cats}) for row in prepared_df.collect()]

In [None]:
# Train model box

import random
import os
from shutil import rmtree
from spacy.util import minibatch, compounding

ITERATIONS = 10

print("{:^5}".format("LOSS"))

optimizer = model.begin_training()
batch_sizes = compounding(4.0, 32.0, 1.001)

for i in range(ITERATIONS):
    losses = {}
    # batch up the examples using spaCy's minibatch
    random.shuffle(train_data)
    batches = minibatch(train_data, size=batch_sizes)
    for batch in batches:
        texts, annotations = zip(*batch)
        model.update(texts, annotations, sgd=optimizer, drop=0.2, losses=losses)
    print("{0:.3f}".format(losses["textcat"]))

# Save the model to disk for the workers to use it
MODEL_DIR = "%s/classifier" % (os.getcwd())

# Clean existing and save
rmtree(MODEL_DIR, ignore_errors=True)
with model.use_params(optimizer.averages):
    model.to_disk(MODEL_DIR)
print("Saved model to", MODEL_DIR)

In [None]:
# Test classifier box
from operator import itemgetter

class SpacyWrapper(object):
    """Wrapper class to load Spacy on worker nodes"""
    _spacys = {}

    @classmethod
    def get(cls, model_dir):
        if model_dir not in cls._spacys:
            import spacy
            cls._spacys[model_dir] = spacy.load(model_dir)
        return cls._spacys[model_dir]

def predict_category(text):
    model = SpacyWrapper.get(MODEL_DIR)
    res = model(text)
    # return the category with the highest prediction
    return sorted(res.cats.items(), key=itemgetter(1), reverse=True)[0][0]
spark.udf.register("predict_category", predict_category, StringType())

predicted_results = spark.sql("select category, predict_category(concat_ws('. ', headline, short_description)) as prediction from test_df")

total_tested = test_df.count()
correct_prediction = predicted_results.filter(predicted_results.category == predicted_results.prediction).count()
correctness = correct_prediction/total_tested
uniform_labels = 1/len(labels)

print("Results:")
print("%s total in test, %s predicted correctly, %.3f" % (total_tested, correct_prediction, correctness))
print("Prediction by guessing categories as uniformly distributed: %.3f" % uniform_labels)