# NLP - HW2

Con el objetivo de aplicar los conocimientos presentados en clase y a su vez generar los modelos solicitados, en este punto se hará uso de la herramienta Databricks y Pyspark con el objetivo de leer los archivos, generar la construcción del dataset y hacer el entrenamiento respectivo de los modelos.

Para la elaboración de la actividad se sigue una serie de pasos:

* Se crea un cluster de spark 3.1 en Databricks, con mínimo 1 driver, 1 nodo y autoescalable a máximo 20.
* Cada nodo es una instancia m4.large de AWS con 2 core y 8gb ram, el driver tiene la misma configuración.
* Los archivos se almacenan en un bucket de s3, se descomprime y se hace la lectura para la construcción del dataset.
* Se hace uso de spark para paralelizar los trabajos y hacerlos más eficiente a la hora de entrenar los modelos.

## Punto No. 2: Naive Bayes (NB) & Logistic Regression (LR)

### For the 20N dataset compare two classifiers NB and LR to identify the 20 different newsgroups.

##### Create your own processing pipeline for the task and justify it.

Para esta primera parte se hará la extracción de los archivos de 20news para poder leer e iterar sobre ellos con el objetivo de construir el dataset.

Para hacer el proceso de extracción de forma eficiente, se utiliza awscli en la maquina y se hace la extracción de archivos en el bucket, una vez extraida la información se valida que en el bucket se encuentren todos los archivos, que en total son 18.828 y 20 carpetas con cada categoría.

In [6]:
# importar librerias

import pyspark.sql.functions as F
from pyspark.sql.types import *

# Librerias para procesar datos
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, StringIndexer

# librerias sparknlp
import sparknlp
from sparknlp.base import *
from sparknlp.common import *
from sparknlp.annotator import *

In [7]:
# Definición de esquema para la creación del dataset

fileSchema = StructType(
    [
        StructField("filePath", StringType(), True),
        StructField("fileText", StringType(), True),
    ]
)

In [8]:
# leer todos los archivos

sdf = sc.wholeTextFiles("/mnt/databricks-mine/HW02/20news-18828/*").toDF(
    schema=fileSchema
)

In [9]:
# Proceso de limpieza y organización de texto

sdf = sdf.withColumn("fileName", F.split(sdf["filePath"], "/").getItem(5)).withColumn(
    "processedText", F.regexp_replace(sdf.fileText, '[_():;,!?\\->@<~^_`}"{/]', " ")).withColumn("fileNumber", F.split(sdf["filePath"], "/").getItem(6))

sdf = sdf.withColumn("processedText", F.regexp_replace(sdf.processedText, "[.$\[\]|*\\']", ""))
sdf = sdf.withColumn("processedText", F.regexp_replace(sdf.processedText, "[0-9]", " "))
sdf = sdf.withColumn("processedText", F.regexp_replace(sdf.processedText, "\n", ""))
sdf = sdf.withColumn('processedText', F.regexp_replace(sdf.processedText, ' +', ' '))
sdf = sdf.withColumn('processedText', F.lower(sdf.processedText))

# Seleccionar las columnas a procesar para los modelos

sdf = sdf.select('processedText', 'fileName', 'fileNumber')

In [10]:
# Definición del pipeline para el procesamiento de texto

document = DocumentAssembler()\
    .setInputCol("processedText")\
    .setOutputCol("document")

token = Tokenizer()\
    .setInputCols(['document'])\
    .setOutputCol('token')

normalice = Normalizer()\
    .setInputCols(['token'])\
    .setOutputCol('token_norm')\
    .setLowercase(True)\
    .setCleanupPatterns(["[^\w\s]"])

stop_words = StopWordsCleaner.pretrained('stopwords_en', 'en')\
    .setInputCols(["token_norm"]) \
    .setOutputCol("cleanTokens") \
    .setCaseSensitive(False)

tokenassembler = TokenAssembler()\
    .setInputCols(["document", "cleanTokens"]) \
    .setOutputCol("clean_text")

prediction_pipeline = Pipeline(
    stages = [
        document,
        token,
        normalice,
        stop_words,
        tokenassembler
    ]
)

In [11]:
# fit y transform del pipeline con la información de 20news
result = prediction_pipeline.fit(sdf).transform(sdf)


# Selección de columnas para la ejecución del modelo
result = (
    result.withColumn("tokens", F.col("cleanTokens.result"))
    .withColumn("text", F.col("clean_text.result"))
    .select("fileName", "fileNumber", "tokens", "text")
)

In [12]:
# Guardar la data procesada en formato parquet para aumentar eficiencia al momento de leer y entrenar modelos.
result.write.partitionBy(["fileName"]).format('parquet').mode("append").save('/mnt/databricks-mine/HW02/news20processed')

In [13]:
# Eliminación de variables no necesarias
del sdf, result

In [14]:
# Importar dataset procesado para la ejecución de los modelos
sdf = spark.read.format('parquet').load('/mnt/databricks-mine/HW02/news20processed')

In [15]:
indexer = StringIndexer(inputCol="fileName", outputCol="labelIndex")
feature_data = indexer.fit(sdf).transform(sdf)

In [16]:
feature_data.select("fileName","labelIndex").groupBy('fileName', 'labelIndex').count().orderBy('labelIndex').show(25)

##### Divide the dataset into training (60%), development (10%) and test (30%)

##### Train NB and LR using the following representations:

###### BOW

###### Boolean BOW

###### Personalized representation. You as a designer must define the select set of characteristics. Explain your feature selection strategy in detail.

#### Compare the results of NB and LR using 10-fold cross validation:

##### Use for cross validation: training+development sets.

##### Do a search for LR hyperparameters

##### Report the average results, precision, and recall by class.

#### Evaluate models using the test set:

##### Precision, recall, F1, accuracy with the micro and macro averaging strategies.

#### Compare the results in terms of:

##### NB vs LR

##### Features

##### Dataset and clasess distribution.

In [33]:
%fs ls /mnt/databricks-mine/HW02/

In [34]:
file_rdd = spark.read.text("/mnt/databricks-mine/HW02/20news-18828.tar.gz", wholetext=True).rdd

In [35]:
file_rdd[0]