# Spark NLP: Part of Speech (POS) and Named Entity Recognition (NER) demo

## Read the given dataset using spark

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("spark_nlp_demo")\
    .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:2.0.9")\
    .getOrCreate()

data = spark.read.parquet('sample_text.parquet')

data.printSchema()
data.show() 

root
 |-- text: string (nullable = true)

+--------------------+
|                text|
+--------------------+
|Reuters historica...|
|Following are som...|
|1714 - The Treaty...|
|1812 - Russian ar...|
|Napoleon entered ...|
|1822 - Brazil pro...|
|As Liberal prime ...|
|1860 - Giuseppe G...|
|1901 - In China ,...|
|1909 - Elia Kazan...|
|1913 - Sir Anthon...|
|He succeeded to t...|
|1969 - Scottish m...|
|Four years later ...|
|1986 - Bishop Des...|
|He was the first ...|
|1990 - The United...|
|He won acclaim fo...|
|1993 - Six former...|
|1994 - The Stars ...|
+--------------------+
only showing top 20 rows




## Create a Spark ML Pipeline using pretrained english  models

In [2]:
from pyspark.ml import Pipeline
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.embeddings import *

my_DocumentAssembler = DocumentAssembler() \
    .setInputCol("text")
    
my_SentenceDetector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence") \
    .setUseAbbreviations(True)

my_Tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

my_WordEmbModel =  WordEmbeddingsModel.pretrained('glove_100d')

my_PosTagger = PerceptronModel.pretrained('pos_anc') \
  .setInputCols(["sentence", "token"])

my_NerCrfModel = NerCrfModel.pretrained('ner_crf')

pipeline = Pipeline().setStages([
    my_DocumentAssembler,
    my_SentenceDetector,
    my_Tokenizer,
    my_WordEmbModel,
    my_PosTagger,
    my_NerCrfModel]) 

model = pipeline.fit(data).transform(data)

## Print the transformed DataFrame showing only the POS column and the NER column. 


In [3]:
from pyspark.sql.functions import col
pos_and_ner = model.select([col("pos.result").alias("pos"), col("ner.result").alias("ner")])
pos_and_ner.show()


+--------------------+--------------------+
|                 pos|                 ner|
+--------------------+--------------------+
|[NNP, JJ, NN, -, ...|[I-ORG, O, O, O, ...|
|[VBG, VBP, DT, IN...|[O, O, O, O, O, O...|
|[CD, :, DT, NNP, ...|[O, O, O, I-ORG, ...|
|[CD, :, JJ, NN, I...|[O, O, I-MISC, O,...|
|[NNP, VBD, NNP, D...|[I-PER, O, I-LOC,...|
|[CD, :, NNP, VBD,...|[O, O, I-LOC, O, ...|
|[IN, NNP, JJ, NN,...|[O, I-MISC, O, O,...|
|[CD, :, NNP, NNP,...|[O, O, I-PER, I-P...|
|[CD, :, IN, NNP, ...|[O, O, O, I-LOC, ...|
|[CD, :, NNP, NNP,...|[O, O, I-PER, I-P...|
|[CD, :, NNP, NNP,...|[O, O, O, I-PER, ...|
|[PRP, VBD, TO, DT...|[O, O, O, O, O, O...|
|[CD, :, NNP, NN, ...|[O, O, I-MISC, O,...|
|[CD, NNS, RB, ,, ...|[O, O, O, O, O, O...|
|[CD, :, NNP, NNP,...|[O, O, O, I-PER, ...|
|[PRP, VBD, DT, JJ...|[O, O, O, O, O, O...|
|[CD, :, DT, NNP, ...|[O, O, O, I-LOC, ...|
|[PRP, VBD, NN, IN...|[O, O, O, O, O, O...|
|[CD, :, CD, JJ, N...|[O, O, O, O, I-MI...|
|[CD, :, DT, NNP, ...|[O, O, O, 

##  As a toy example, we will show that entities classified as nouns (POS tag starts with "N") are much more likely to have NER tag other than O compared to the other parts of speech, e.g. verbs or adjectives

In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def ner_count_in_pos_group(pos_is_noun, ner_is_O, pos_list, ner_list):
    selected_pos_inds =  [ind for ind, el in enumerate(pos_list) if (el[0] == "N") == pos_is_noun]
    return len([el for ind, el in enumerate(ner_list) if (ind in selected_pos_inds) and ((el == "O") == ner_is_O)])

pos_noun_ner_O = udf(lambda x,y: ner_count_in_pos_group(True, True, x, y), IntegerType())
pos_noun_ner_not_O = udf(lambda x,y: ner_count_in_pos_group(True, False, x, y), IntegerType())
pos_rest_ner_O = udf(lambda x,y: ner_count_in_pos_group(False, True, x, y), IntegerType())
pos_rest_ner_not_O = udf(lambda x,y: ner_count_in_pos_group(False, False, x, y), IntegerType())

summary = pos_and_ner.select("pos", "ner", 
                 pos_noun_ner_O('pos', 'ner').alias('pos_noun_ner_O'),
                 pos_noun_ner_not_O('pos', 'ner').alias('pos_noun_ner_not_O'),
                 pos_rest_ner_O('pos', 'ner').alias('pos_rest_ner_O'),
                 pos_rest_ner_not_O('pos', 'ner').alias('pos_rest_ner_not_O'))

summary = summary.groupBy().sum()
summary.show()

sums = summary.collect()[0][:]  #('pos_noun_ner_O', 'pos_noun_ner_not_O', 'pos_rest_ner_O', 'pos_rest_ner_not_O')

print('Fraction of nouns labeled as non-O by NER = ', sums[1] / (sums[0] + sums[1]))
print('Fraction of all other POS labeled as non-O by NER = ', sums[3] / (sums[2] + sums[3]))

+-------------------+-----------------------+-------------------+-----------------------+
|sum(pos_noun_ner_O)|sum(pos_noun_ner_not_O)|sum(pos_rest_ner_O)|sum(pos_rest_ner_not_O)|
+-------------------+-----------------------+-------------------+-----------------------+
|               5069|                   3896|              16901|                    241|
+-------------------+-----------------------+-------------------+-----------------------+

Fraction of nouns labeled as non-O by NER =  0.4345789180145008
Fraction of all other POS labeled as non-O by NER =  0.01405903628514759
