# Práctica Final: detección de mensajes troll 

En los últimos años Twitch se ha consolidad como uno de los principales medios de comunicación especialmente para las generaciones más jóvenes.

Al tratarse de una plataforma participativa en la que los usuarios pueden poner comentarios durante y posteriormente a las emisiones. Entre estos comentarios han aparecido como siempre comentarios ofensisvos.

En esta práctica construiremos una Inteligencia Artificial capaz de clasificar esos mensajes troll.

Durante la práctica entrenaremos el modelo de Deep Learning y lo desplegaremos para inferencia en batch, la más habitual actualmente dentro de la industria:

## Teoría

1. ¿Qué es Apache Beam?

Sirve para preparar pipelines para realizar procesamiento de datos, diseñado para ejecutar en varios motores de procesamiento, como Apache Spark, Apache Flink, Google Cloud Dataflow...
Beam te permite preparar el proceso para ejecutarlo de la misma forma sin importar el entorno en el que se ejecute.

2. ¿Cuáles son las diferentes formas de desplegar un modelo?

Se puede desplegar en local (propia máquina), en un servidor On Premise (servidor propio físico) o despliegue en la nube (Google Cloud, AWS, Azure...)

3. ¿Cuál es la principal diferencia entre la inferencia en batch y la inferencia en streaming?

El batch se procesa por lotes, en paralelo y en grandes cantidades, no es en tiempo real. En streaming se procesan según llegan en tiempo real, se hace una prediccion por cada paquete de datos y se devuelve el resultado en el mismo momento.

# Configuración de nuestro proyecto en GCP


In [3]:
PROJECT_ID = "deploypractice-380919" #@param {type:"string"}
! gcloud config set project $PROJECT_ID

Updated property [core/project].


In [4]:
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

if 'google.colab' in sys.modules:
  from google.colab import auth as google_auth
  google_auth.authenticate_user()

# If you are running this notebook locally, replace the string below with the
# path to your service account key and run this cell to authenticate your GCP
# account.
else:
  %env GOOGLE_APPLICATION_CREDENTIALS ''


Creamos el bucket mediante la consola y una vez creado lo indicamos en la variable:

In [5]:
BUCKET_NAME = "bucket-practice-armando" #@param {type:"string"}
REGION = "europe-west1" #@param {type:"string"}

In [6]:
! gsutil ls -al gs://$BUCKET_NAME

# Entrenamiento e inferencia en Batch

## Preparación

Para esta primera parte se va a utilizar [Tweets Dataset for Detection of Cyber-Trolls](https://www.kaggle.com/dataturks/dataset-for-detection-of-cybertrolls). El objetivo es desarrollar un clasificador binario para detectar si el mensaje recibido es troll (1) o no (0). **Las métricas obtenidas del entrenamiento y la inferencia no se tendrán en cuenta para la evaluación de la práctica, la importancia está en la arquitectura de la solución**, es decir, lo importante no es que nuestro modelo detecte correctamente los tweets de trolls si no que funcione y sea capaz de hacer inferencias.


A continuación, se van a subir los datos de entrenamiento al bucket del proyecto que se haya creado. **Importante:** crea el bucket en una única región. Os dejo disponibilizado el dataset en un bucket de acceso público:

In [7]:
# Upload data to your bucket
! wget https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json -O - | gsutil cp - gs://$BUCKET_NAME/data.json

--2023-03-17 19:35:43--  https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json
Resolving storage.googleapis.com (storage.googleapis.com)... 142.251.12.128, 172.217.194.128, 74.125.200.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.251.12.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2756023 (2.6M) [application/json]
Saving to: ‘STDOUT’

-                     0%[                    ]       0  --.-KB/s               Copying from <STDIN>...

2023-03-17 19:35:49 (514 KB/s) - written to stdout [2756023/2756023]

/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              


Ahora se crea el directorio dónde vas a desarrollar esta primera parte de la práctica.

In [8]:
%mkdir /content/batch

Se establece el directorio de trabajo que hemos creado.

In [9]:
import os

# Set the working directory to the sample code directory
%cd /content/batch

WORK_DIR = os.getcwd()

/content/batch


Ahora se descargarán los datos en el workspace de Colab para trabajar en local.

In [23]:
! wget -O data.json https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json

--2023-03-17 20:21:20--  https://storage.googleapis.com/datos-practica-compartidos/Dataset%20for%20Detection%20of%20Cyber-Trolls.json
Resolving storage.googleapis.com (storage.googleapis.com)... 74.125.200.128, 74.125.68.128, 74.125.24.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|74.125.200.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2756023 (2.6M) [application/json]
Saving to: ‘data.json’


2023-03-17 20:21:20 (123 MB/s) - ‘data.json’ saved [2756023/2756023]



Se establecen las dependencias que se usarán en la práctica. Se pueden añadir y quitar las dependencias que no se usen o viceversa.

In [11]:
%%writefile requirements.txt

apache-beam[gcp]==2.24.0
tensorflow
gensim==3.6.0
fsspec==0.8.4
gcsfs==0.7.1
numpy==1.20.0
apache-beam
pyarrow==0.17.1

Writing requirements.txt


Instalamos las dependencias. **No olvides reiniciar el entorno al instalar y establecer las variables y credenciales de GCP al arrancar.**

In [19]:
! pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting apache-beam[gcp]==2.24.0
  Using cached apache-beam-2.24.0.zip (2.2 MB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fsspec==0.8.4
  Using cached fsspec-0.8.4-py3-none-any.whl (91 kB)
Collecting gcsfs==0.7.1
  Using cached gcsfs-0.7.1-py2.py3-none-any.whl (20 kB)
Collecting numpy==1.20.0
  Using cached numpy-1.20.0-cp39-cp39-manylinux2010_x86_64.whl (15.4 MB)
Collecting apache-beam
  Using cached apache_beam-2.46.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.5 MB)
Collecting pyarrow==0.17.1
  Using cached pyarrow-0.17.1.tar.gz (2.6 MB)
  Installing build dependencies ... [?25l[?25hcanceled
[31mERROR: Operation cancelled by user[0m[31m
[0m

In [20]:
! pip install apache-beam

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting apache-beam
  Using cached apache_beam-2.46.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.5 MB)
Collecting crcmod<2.0,>=1.7
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 KB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pymongo<4.0.0,>=3.8.0
  Downloading pymongo-3.13.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (515 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m515.5/515.5 KB[0m [31m41.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting fastavro<2,>=0.23.6
  Downloading fastavro-1.7.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m47.9 MB/s[0m eta [36m0:00:00[0m
Collecting fasteners<1.0,>=0.3

## Primer ejercicio

Desarrollar un pipeline de preprocesamiento utilizando Apache Beam para generar datos de train, eval y test para los datos proporcionados anteriormente. Requisitos:

- Proporcionar dos modos de ejecución: `train` y `test`
- Soportar ejecuciones en local con `DirectRunner` y ejecuciones en Dataflow usando `DataFlowRunner`.

In [43]:
%%writefile preprocess.py

# Rellena con tu solución
from __future__ import absolute_import

import argparse
import logging
import re
import os
import csv
import random
import json

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.coders.coders import Coder
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions, DirectOptions

import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer

nltk.download("stopwords")

# CLEANING
STOP_WORDS = stopwords.words("english")
STEMMER = SnowballStemmer("english")
TEXT_CLEANING_RE = "@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+"


class ExtractColumnsDoFn(beam.DoFn):
    def process(self, element):
        yield json.loads(element)


class PreprocessColumnsTrainFn(beam.DoFn):
    def process_sentiment(self, sentiment):
        print("Sentiment: ", sentiment)
        sentiment = int(sentiment)
        if sentiment == 4:
            return "POSITIVE"
        elif sentiment == 0:
            return "NEGATIVE"
        else:
            return "NEUTRAL"

    def process_text(self, text):
        # Remove link,user and special characters
        stem = False
        text = re.sub(TEXT_CLEANING_RE, " ", str(text).lower()).strip()
        tokens = []
        for token in text.split():
            if token not in STOP_WORDS:
                if stem:
                    tokens.append(STEMMER.stem(token))
                else:
                    tokens.append(token)
        return " ".join(tokens)

    def process(self, element):
        print("Elemento: ", element)
        # Elemento:  {'content': 'Hmm  maybe I crossed wires w/1 of you & @allisoncarter. Who called who nerd & was correct as geek? Me moving 2 fast 4 fun.', 'annotation': {'notes': '', 'label': ['0']}, 'extras': None}
        processed_text = self.process_text(element['content'])
        processed_sentiment = self.process_sentiment(element['annotation']['label'][0])
        yield f"{processed_text}, {processed_sentiment}"


class CustomCoder(Coder):
    """A custom coder used for reading and writing strings"""

    def __init__(self, encoding: str):
        # latin-1
        # iso-8859-1
        self.enconding = encoding

    def encode(self, value):
        return value.encode(self.enconding)

    def decode(self, value):
        return value.decode(self.enconding)

    def is_deterministic(self):
        return True


def run(argv=None, save_main_session=True):

    """Main entry point; defines and runs the wordcount pipeline."""

    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--work-dir", dest="work_dir", required=True, help="Working directory",
    )

    parser.add_argument(
        "--input", dest="input", required=True, help="Input dataset in work dir",
    )
    parser.add_argument(
        "--output",
        dest="output",
        required=True,
        help="Output path to store transformed data in work dir",
    )
    parser.add_argument(
        "--mode",
        dest="mode",
        required=True,
        choices=["train", "test"],
        help="Type of output to store transformed data",
    )

    known_args, pipeline_args = parser.parse_known_args(argv)

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    pipeline_options.view_as(DirectOptions).direct_num_workers = 0

    # The pipeline will be run on exiting the with block.
    with beam.Pipeline(options=pipeline_options) as p:

        # Read the text file[pattern] into a PCollection.
        raw_data = p | "ReadTwitchData" >> ReadFromText(
            known_args.input, coder=CustomCoder("latin-1")
        )

        if known_args.mode == "train":

            transformed_data = (
                raw_data
                | "ExtractColumns" >> beam.ParDo(ExtractColumnsDoFn())
                | "Preprocess" >> beam.ParDo(PreprocessColumnsTrainFn())
            )

            eval_percent = 20
            assert 0 < eval_percent < 100, "eval_percent must in the range (0-100)"
            train_dataset, eval_dataset = (
                transformed_data
                | "Split dataset"
                >> beam.Partition(
                    lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2
                )
            )

            train_dataset | "TrainWriteToCSV" >> WriteToText(
                os.path.join(known_args.output, "train", "part")
            )
            eval_dataset | "EvalWriteToCSV" >> WriteToText(
                os.path.join(known_args.output, "eval", "part")
            )

        else:
            transformed_data = (
                raw_data
                | "ExtractColumns" >> beam.ParDo(ExtractColumnsDoFn())
                | "Preprocess" >> beam.Map(lambda x: f'"{x["content"]}"')
            )

            transformed_data | "TestWriteToCSV" >> WriteToText(
                os.path.join(known_args.output, "test", "part")
            )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()


Overwriting preprocess.py


Se proporciona un fichero `setup.py` necesario para ejecutar en DataFlow. Modificar la variable `REQUIRED_PACKAGES` con las dependencias que se hayan usado en el `requirements.txt`

In [15]:
%%writefile setup.py

import setuptools

REQUIRED_PACKAGES = [
  "apache-beam[gcp]==2.24.0",
  "tensorflow==2.8.0"
]

setuptools.setup(
    name="twitchstreaming",
    version="0.0.1",
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
    description="Troll detection",
)


Writing setup.py


### Validación preprocess train en local
Con el comando mostrado a continuación se valida la correcta generación de los datos de entrenamiento y validación en local.

In [44]:
! python3 preprocess.py \
  --work-dir $WORK_DIR \
  --runner DirectRunner \
  --input $WORK_DIR/data.json \
  --output $WORK_DIR/transformed_data \
  --mode train

[1;30;43mSe han truncado las últimas 5000 líneas del flujo de salida.[0m
Elemento:  {'content': "that sucks.  I'm sorry to hear that!  I hope everything works out for you guys and that you can still have a great holiday!", 'annotation': {'notes': '', 'label': ['0']}, 'extras': None}
Sentiment:  0
Elemento:  {'content': 'EW! I hate waking up that early.', 'annotation': {'notes': '', 'label': ['0']}, 'extras': None}
Sentiment:  0
Elemento:  {'content': "Dang that's rough.  i'm just glad there are jobs in Texas for people whos states economy sucks worse then ours", 'annotation': {'notes': '', 'label': ['0']}, 'extras': None}
Sentiment:  0
Elemento:  {'content': ' That sucks make Justin come get you on his bike', 'annotation': {'notes': '', 'label': ['0']}, 'extras': None}
Sentiment:  0
Elemento:  {'content': 'Exactly! hate that so many designers and retailers think "plus size" = "matronly". Or clothing cheaply made.', 'annotation': {'notes': '', 'label': ['0']}, 'extras': None}
Sentimen

### Validación preprocess test en local 

Con el comando mostrado a continuación se valida la correcta generación de los datos de test en local.

In [45]:
! python3 preprocess.py \
  --work-dir $WORK_DIR \
  --runner DirectRunner \
  --input $WORK_DIR/data.json \
  --output $WORK_DIR/transformed_data \
  --mode test

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.46.0
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f1fdb57dc70> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f1fdb579670> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:root:Default Python SDK image for environ

## Segundo ejercicio

Desarrollar una tarea de entrenamiento para los datos preprocesados. Requisitos:

- Soportar ejecuciones en local usando el SDK de AI-Platform y ejecuciones en GCP con el mismo código.

Se crea el directorio donde trabajaremos:

In [None]:
%mkdir /content/batch/trainer

In [None]:
%%writefile trainer/__init__.py

version = "0.1.0"

In [None]:
%%writefile trainer/task.py

# Rellena con tu solución

### Validación Train en local

Con el comando mostrado a continuación se valida el correcto entrenamiento del modelo usando los datos preprocesados del apartado anterior.

In [None]:
# Explicitly tell `gcloud ai-platform local train` to use Python 3 
! gcloud config set ml_engine/local_python $(which python3)

# This is similar to `python -m trainer.task --job-dir local-training-output`
# but it better replicates the AI Platform environment, especially for
# distributed training (not applicable here).
! gcloud ai-platform local train \
  --package-path trainer \
  --module-name trainer.task \
  -- \
  --work-dir $WORK_DIR \
  --epochs 1

## Tercer ejercicio

Desarrollar un pipeline de inferencia utilizando Apache Beam para generar predicciones usando los modelos generados en el apartado anterior así como los datos de test generados en el primer ejercicio.


In [None]:
%%writefile predict.py

# Rellena con tu solución

Generamos un timestamp para la ejecución de las predicciones

In [None]:
from datetime import datetime

# current date and time
TIMESTAMP = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

### Validación Predict en local

Con el comando mostrado a continuación se valida la correcta inferencia usando los modelos anteriores y los datos de test generados anteriormente.

In [None]:
! python3 predict.py \
  --work-dir $WORK_DIR \
  --model-dir $WORK_DIR/model \
  batch \
  --inputs-dir $WORK_DIR/transformed_data/test/part* \
  --outputs-dir $WORK_DIR/predictions/$TIMESTAMP

## Extra. Comprobaciones en la nube

En esta última parte se validará el funcionamiento del código en un proyecto de GCP sobre DataFlow y AI Platform

Establecemos el bucket y region de GCP sobre el que trabajaremos:

In [None]:
GCP_WORK_DIR = ''
GCP_REGION = ''

### Validación preprocess train en Dataflow

Con el comando mostrado a continuación se valida la correcta generación de los datos de entrenamiento y validación en GCP con el servicio DataFlow.

In [None]:
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $GCP_WORK_DIR \
  --input $GCP_WORK_DIR/data.json \
  --output $GCP_WORK_DIR/transformed_data \
  --mode train

### Validación preprocess test en Dataflow

Con el comando mostrado a continuación se valida la correcta generación de los datos de test en GCP con el servicio DataFlow.

In [None]:
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $GCP_WORK_DIR \
  --input $GCP_WORK_DIR/data.json \
  --output $GCP_WORK_DIR/transformed_data \
  --mode test

Omitimos el entrenamiento en la nube para no incurrir en costes innecesarios.

### Validación predict en Dataflow

Con el comando mostrado a continuación se valida la predicción correcta de los datos de test usando los modelos generados en el comando anterior.

Generamos un timestamp para el almacenamiento de las inferencias en Google Cloud Storage.

In [None]:
from datetime import datetime

# current date and time
TIMESTAMP = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

In [None]:
# For using sample models: --model-dir gs://$BUCKET_NAME/models/
! python3 predict.py \
  --work-dir $GCP_WORK_DIR \
  --model-dir $GCP_WORK_DIR/model/ \
  batch \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --inputs-dir $GCP_WORK_DIR/transformed_data/test/part* \
  --outputs-dir $GCP_WORK_DIR/predictions/$TIMESTAMP