# Named Entity Recognition Pipeline

El pipeline toma una URL de un feed en formato RSS, obtiene el título y descripción de los artículos en el feed, detecta las NER con un modelo pre-entrenado, y las muestra ordenadas por frecuencia de aparición.

### Versiones
Probado con:
* Almond 0.6.0
* Ammonite 1.6.7
* Scala library version **2.11.12** -- Copyright 2002-2017, LAMP/EPFL
* Java 1.8.0_282

Para ver más información ir a (Help -> About Scala Kernel)

## 1. Obtener texto

### 1.1 Importar librerías

Markdown **negrita**

In [3]:
// Equivalent of adding dependencies to maven or sbt files
// For example, to add "org.scalaj" %% "scalaj-http" % "2.4.2" 
import $ivy.`org.scalaj::scalaj-http:2.4.2`
// "org.scala-lang.modules" %% "scala-xml" % "1.3.0"
import $ivy.`org.scala-lang.modules::scala-xml:1.3.0`

[32mimport [39m[36m$ivy.$                              
// "org.scala-lang.modules" %% "scala-xml" % "1.3.0"
[39m
[32mimport [39m[36m$ivy.$                                        [39m

In [4]:
import scalaj.http.{Http, HttpResponse}
import scala.xml.XML

[32mimport [39m[36mscalaj.http.{Http, HttpResponse}
[39m
[32mimport [39m[36mscala.xml.XML[39m

### 1.1 Obtener el texto del RSS Feed

Realizamos una consulta HTTP, que nos devuelve una instancia de HTTPResponse. Dentro del atributo `body` de la HTTPResponse, se encuentra el texto del feed en formato XML. Luego, se parsea el XML para extraer los campos `title` y `description`.

In [3]:
// Tutorial https://alvinalexander.com/source-code/scala-how-to-http-download-xml-rss-feed-timeout/
// get the xml content using scalaj-http
val url = "https://www.chicagotribune.com/arcio/rss/category/sports/?query=display_date:[now-2d+TO+now]&sort=display_date:desc"
val response: HttpResponse[String] = Http(url)
  .timeout(connTimeoutMs = 2000, readTimeoutMs = 5000)
  .asString
val xmlString = response.body
// convert the `String` to a `scala.xml.Elem`
val xml = XML.loadString(xmlString)
// Extract text from title and description
val rssText = (xml \\ "item").map { item =>
    ((item \ "title").text ++ " " ++ (item \ "description").text)
}

[36murl[39m: [32mString[39m = [32m"https://www.chicagotribune.com/arcio/rss/category/sports/?query=display_date:[now-2d+TO+now]&sort=display_date:desc"[39m
[36mresponse[39m: [32mHttpResponse[39m[[32mString[39m] = [33mHttpResponse[39m(
  [32m"""<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:media="http://search.yahoo.com/mrss/" xmlns:sy="http://purl.org/rss/1.0/modules/syndication/"><channel><title>Chicago Tribune</title><link>https://www.chicagotribune.com</link><language>en-US</language><copyright>© 2021 Chicago Tribune</copyright><atom:link href="https://www.chicagotribune.com/arcio/rss/category/sports/?query=display_date:%5Bnow-2d+TO+now%5D&amp;sort=display_date:desc" rel="self" type="application/rss+xml"/><description>Chicago Tribune News Feed</description><lastBuildDate>Mon, 12 Apr 2021 21:06:00 +0000</lastBui

## 2. Detectar las entidades nombradas

### 2.1 Importar librerías

In [4]:
// Equivalent of adding dependencies to maven or sbt files
import $ivy.`org.apache.spark::spark-sql:2.4.4`
import $ivy.`org.apache.spark::spark-mllib:2.4.4`
import $ivy.`com.johnsnowlabs.nlp::spark-nlp:2.7.4`
import $ivy.`sh.almond::almond-spark:0.6.0`

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36m$ivy.$                                      
[39m
[32mimport [39m[36m$ivy.$                              [39m

In [5]:
import org.apache.log4j.{Level, Logger}

Logger.getLogger("org").setLevel(Level.OFF)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}

[39m

In [6]:
import com.johnsnowlabs.nlp.base._
import com.johnsnowlabs.nlp.annotator._
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.{functions => F}  // Rename import

[32mimport [39m[36mcom.johnsnowlabs.nlp.base._
[39m
[32mimport [39m[36mcom.johnsnowlabs.nlp.annotator._
[39m
[32mimport [39m[36morg.apache.spark.ml.Pipeline
[39m
[32mimport [39m[36morg.apache.spark.sql.{functions => F}  // Rename import[39m

In [7]:
// NO SACAR ESTA CELDA DE LA NOTEBOOK!!!
import org.apache.spark.sql._
// If a new library is included, this cell must be re-run
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}
import spark.implicits._ // This is necessary for all the .toDF
// And it has to go after starting the session

Loading spark-stubs
Creating SparkSession


[32mimport [39m[36morg.apache.spark.sql._
// If a new library is included, this cell must be re-run
[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@756602fc
[32mimport [39m[36mspark.implicits._ // This is necessary for all the .toDF
// And it has to go after starting the session[39m

### 2.2 Crear la instancia de SparkML Pipeline

Este código genera el pipeline de datos.

El **modelo** en sí es la instancia de `NerDLModel`. El código anterior son transformadores que pre-procesan el texto para dejarlo en el formato esperado por el modelo. La instancia de `NerConverter` es un transformador que procesa la salida del modelo a un formato más fácil de leer. Todas estas etapas se combinan en un **pipeline** de datos que recibe texto plano y devuelve las entidades nombradas.

La entrada debe ser un DataFrame y tener una única columna llamada `description`

In [8]:
val input_col = "description"

[36minput_col[39m: [32mString[39m = [32m"description"[39m

In [9]:
// https://medium.com/@saif1988/spark-nlp-walkthrough-powered-by-tensorflow-9965538663fd
val documentAssembler = new DocumentAssembler()
    .setInputCol("description")
    .setOutputCol("document")

val sentenceDetector = new SentenceDetector()
    .setInputCols("document")
    .setOutputCol("sentence")

val tokenizer = new Tokenizer()
    .setInputCols(Array("sentence"))
    .setOutputCol("token")

// Search for other pretrained models here
// https://nlp.johnsnowlabs.com/docs/en/models
val embeddings = WordEmbeddingsModel.pretrained("glove_100d", "en")
    .setInputCols(Array("sentence", "token"))
    .setOutputCol("embeddings")

val ner = NerDLModel.pretrained("ner_dl")
    .setInputCols(Array("sentence", "token", "embeddings"))
    .setOutputCol("ner")

val nerConverter = new NerConverter()
    .setInputCols(Array("document", "token", "ner"))
    .setOutputCol("ner_chunk")

val pipeline = new Pipeline()
    .setStages(Array(documentAssembler, sentenceDetector, tokenizer,
                     embeddings, ner, nerConverter))

glove_100d download started this may take some time.
Approximate size to download 145.3 MB
Download done! Loading the resource.


ner_dl download started this may take some time.
Approximate size to download 13.6 MB
Download done! Loading the resource.


[36mdocumentAssembler[39m: [32mDocumentAssembler[39m = document_56210cf5c4d1
[36msentenceDetector[39m: [32mSentenceDetector[39m = SENTENCE_bd6a4a299202
[36mtokenizer[39m: [32mTokenizer[39m = REGEX_TOKENIZER_af4d9a4105f0
[36membeddings[39m: [32mWordEmbeddingsModel[39m = WORD_EMBEDDINGS_MODEL_48cffc8b9a76
[36mner[39m: [32mNerDLModel[39m = NerDLModel_d4424c9af5f4
[36mnerConverter[39m: [32mNerConverter[39m = NER_CONVERTER_871261986c76
[36mpipeline[39m: [32mPipeline[39m = pipeline_30544d6d9eac

### 2.3 Cargar los datos en un DataFrame

El pipeline espera que el texto esté en una instancia DataFrame de Spark, que es como una tabla con filas (una para cada artículo) y columnas (sólo una, para el texto). Para poder usar DataFrames, se debe crear primero una sesión de Spark.

Convertimos el texto en DataFrame, que tiene que tener una única columna con el mismo nombre que espera el modelo.

In [10]:
val data = (rssText).toDF(input_col)

[36mdata[39m: [32mDataFrame[39m = [description: string]

### 2.4 Aplicar el SparkML Pipeline a los datos

In [11]:
val trainedModel = pipeline.fit(data.limit(1))  // Use only first row, we only need the schema
val result = trainedModel.transform(data)

[36mtrainedModel[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mml[39m.[32mPipelineModel[39m = pipeline_30544d6d9eac
[36mresult[39m: [32mDataFrame[39m = [description: string, document: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>> ... 5 more fields]

Vemos el resultado sólo para el primer artículo.

In [12]:
// Prediction is not executed until we actually want to use the results
result.select("ner_chunk").limit(1).show(truncate=false)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ner_chunk                                                                                                                                                                                                                                                                                                                                                                                        |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## 3. Contar y ordenar las entidades

In [14]:
val sortedNEs = result.select(
  F.explode(
    F.arrays_zip(F.col("ner_chunk.result"), F.col("ner_chunk.metadata")))
    .alias("entities"))
  .select(
      F.expr("entities['0']").alias("entity"),
      F.expr("entities['1'].entity").alias("label"))
  .groupBy("entity", "label").count().orderBy(F.desc("count"))
  .select("entity", "count")
  .collect()

[36msortedNEs[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [Chicago White Sox,9],
  [Chicago Cubs,6],
  [Humboldt Park Gators,4],
  [Pittsburgh Pirates,4],
  [Masters,4],
  [Kansas City Royals,4],
  [COVID-19,4],
  [Hideki Matsuyama,4],
  [Japanese,3],
  [Yermín Mercedes,3],
  [White Sox,3],
  [Chicago Blackhawks,3],
  [Chicago Bulls,3],
  [Chicago Cubs’,2],
  [Kansas City Chiefs,2],
  [Los Angeles Angels,2],
  [Britt Reid,2],
  [Bulls,2],
  [Eddie George,2],
  [Sweden,2],
  [NBA,2],
  [Zach Davies,2],
  [Prince Philip’s,2],
  [Tim Anderson,2],
  [Pittsburgh,2],
  [Pirates,2],
  [Columbus Blue Jackets,2],
  [Minnesota Timberwolves,2],
  [Blackhawks,2],
  [Craig Driver,2],
  [NHL,2],
  [Dexter Fowler,2],
  [Royals,2],
  [ACL,2],
  [Timberwolves,2],
  [Minneapolis The Minnesota Twins,1],
  [Chicago Bulls’,1],
  [Nationwide Arena,1],
...