<img src="https://nlp.johnsnowlabs.com/assets/images/logo.png" width="180" height="50" style="float: left;">

## Deep Learning NER

In the following example, we walk-through a LSTM NER model training and prediction. This annotator is implemented on top of TensorFlow.

This annotator will take a series of word embedding vectors, training CoNLL dataset, plus a validation dataset. We include our own predefined Tensorflow Graphs, but it will train all layers during fit() stage.

DL NER will compute several layers of BI-LSTM in order to auto generate entity extraction, and it will leverage batch-based distributed calls to native TensorFlow libraries during prediction. 

### Spark `2.4` and Spark NLP `1.8.2`

#### 1. Call necessary imports and set the resource folder path.

In [None]:
import os
import sys
sys.path.append('../../')

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

import time
import zipfile
#Setting location of resource Directory
resource_path= "../../../src/test/resources/"

#### 2. Download CoNLL 2003 data if not present

In [None]:
# Download CoNLL 2003 Dataset
import os
from pathlib import Path
import urllib.request
url = "https://github.com/patverga/torch-ner-nlp-from-scratch/raw/master/data/conll2003/"
file_train="eng.train"
file_testa= "eng.testa"
file_testb= "eng.testb"
# https://github.com/patverga/torch-ner-nlp-from-scratch/tree/master/data/conll2003
if not Path(file_train).is_file():   
    print("Downloading "+file_train)
    urllib.request.urlretrieve(url+file_train, file_train)
if not Path(file_testa).is_file():
    print("Downloading "+file_testa)
    urllib.request.urlretrieve(url+file_testa, file_testa)

if not Path(file_testb).is_file():
    print("Downloading "+file_testb)
    urllib.request.urlretrieve(url+file_testb, file_testb)

#### 3. Create the spark session

In [None]:
spark = SparkSession.builder \
    .appName("DL-NER")\
    .master("local[*]")\
    .config("spark.driver.memory","12G")\
    .config("spark.kryoserializer.buffer.max", "500m")\
    .getOrCreate()

#### 4. Load dataset and cache into memory

In [None]:
from sparknlp.training import CoNLL
training_data = CoNLL().readDataset(spark, './eng.train')
training_data.show()

#### 5. Create annotator components with appropriate params and in the right order. The finisher will output only NER. Put everything in Pipeline

In [None]:
bert = BertEmbeddings.pretrained() \
 .setInputCols(["sentence"])\
 .setOutputCol("bert")\
 .setCaseSensitive(False)

In [None]:
%%time

# WARNING: This STEP is slow and might crash your system -- High end hardware and/or GPU required
## dataframe.cache() does not solve this. Results must be serialized to disk for maximum efficiency
### You might need to restart your driver after this step finishes

from pathlib import Path

with_bert_path = "./with_bert.parquet"

if not Path(with_bert_path).is_dir():
    bert.transform(training_data).write.parquet("./with_bert.parquet")

training_with_bert = spark.read.parquet("./with_bert.parquet").cache()

print(training_with_bert.count())
training_with_bert.select("token", "bert").show()

In [None]:
nerTagger = NerDLApproach()\
  .setInputCols(["sentence", "token", "bert"])\
  .setLabelColumn("label")\
  .setOutputCol("ner")\
  .setMaxEpochs(1)\
  .setRandomSeed(0)\
  .setVerbose(0)

converter = NerConverter()\
  .setInputCols(["document", "token", "ner"])\
  .setOutputCol("ner_span")

pipeline = Pipeline(
    stages = [
    nerTagger,
    converter
  ])

#### 6. Train the pipeline. (This will take some time)

In [None]:
%%time

start = time.time()
print("Start fitting")
model = pipeline.fit(training_with_bert)
print("Fitting is ended")
print (time.time() - start)

#### 7. Lets predict with the model

In [None]:
document = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentence = SentenceDetector()\
    .setInputCols(['document'])\
    .setOutputCol('sentence')

token = Tokenizer()\
    .setInputCols(['sentence'])\
    .setOutputCol('token')

prediction_pipeline = Pipeline(
    stages = [
        document,
        sentence,
        token,
        bert,
        model
    ]
)

In [None]:
prediction_data = spark.createDataFrame([["Germany is a nice place"]]).toDF("text")
prediction_data.show()

In [None]:
prediction_model = prediction_pipeline.fit(prediction_data)

In [None]:
%%time

lp = LightPipeline(prediction_model)
result = lp.annotate("International Business Machines Corporation (IBM) is an American multinational information technology company headquartered in Armonk.")
for e in list(zip(result['token'], result['ner'])):
    print(e)

In [None]:
%%time

# This might take 8 minutes. Timing is not lineal

prediction_model.transform(prediction_data).show()

#### 8. Save both pipeline and single model once trained, on disk

In [None]:
prediction_model.write().overwrite().save("./ner_dl_model")

#### 9. Load both again, deserialize from disk

In [None]:
from pyspark.ml import PipelineModel, Pipeline

loaded_prediction_model = PipelineModel.read().load("./ner_dl_model")

In [None]:
%%time
lp = LightPipeline(loaded_prediction_model)
result = lp.annotate("Peter is a good person.")
for e in list(zip(result['token'], result['ner']))[:10]:
    print(e)

In [None]:
for stage in loaded_prediction_model.stages:
    print(stage)
print(loaded_prediction_model.stages[-1].stages)