# Práctica Final: detección de mensajes troll

## Ejemplo de solución.

**Disclaimer.** Existen distintas formas de resolver el ejercicio y todas se considerarán correctas siempre que el modelo se entrene y despliegue acorde a lo esperado.

En los recientes años, Twitch no sólo ha surgido como un pionero en la comunicación moderna, sino también como una un sitio donde las nuevas generaciones pasan el tiempo ver a otros jovenes haciendo cosas por internet. Esta plataforma, rica en interactividad, permite a los usuarios compartir sus pensamientos en tiempo real y después de las transmisiones. Sin embargo, entre las voces genuinas, algunas buscan dañar con comentarios ofensivos.

Pero hoy, nos embarcamos en una misión inspiradora: diseñar una Inteligencia Artificial que pueda identificar y neutralizar estos mensajes tóxicos. A lo largo de esta práctica, no solo entrenaremos un modelo de Deep Learning avanzado, sino que también lo implementaremos para inferencias en batch, estableciendo un nuevo estándar en la industria.

# Configuración de nuestro proyecto en GCP


In [1]:
PROJECT_ID = "august-button-377008" #@param {type:"string"}
! gcloud config set project $PROJECT_ID

Updated property [core/project].


In [2]:
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

if 'google.colab' in sys.modules:
  from google.colab import auth as google_auth
  google_auth.authenticate_user()

# If you are running this notebook locally, replace the string below with the
# path to your service account key and run this cell to authenticate your GCP
# account.
else:
  %env GOOGLE_APPLICATION_CREDENTIALS ''


Creamos el bucket mediante la consola y una vez creado lo indicamos en la variable:

In [3]:
BUCKET_NAME = "practicadesplieguealgoritmos2" #@param {type:"string"}
REGION = "europe-west1" #@param {type:"string"}

In [4]:
! gsutil ls -al gs://$BUCKET_NAME

In [9]:
#! gsutil mb -l $REGION gs://$BUCKET_NAME

Creating gs://practicadesplieguealgoritmos2/...


# Entrenamiento e inferencia en Batch

## Preparación

Para esta primera parte se va a utilizar [Tweets Dataset for Detection of Cyber-Trolls](https://www.kaggle.com/dataturks/dataset-for-detection-of-cybertrolls). El objetivo es desarrollar un clasificador binario para detectar si el mensaje recibido es troll (1) o no (0). **Las métricas obtenidas del entrenamiento y la inferencia no se tendrán en cuenta para la evaluación de la práctica, la importancia está en la arquitectura de la solución**, es decir, lo importante no es que nuestro modelo detecte correctamente los tweets de trolls si no que funcione y sea capaz de hacer inferencias.


A continuación, se van a subir los datos de entrenamiento al bucket del proyecto que se haya creado. **Importante:** crea el bucket en una única región. Os dejo disponibilizado el dataset en un bucket de acceso público:

In [5]:
%pip install gdown
! gdown "1dTaKofC9ZcMWa5cVtGLDFkEnbc4hiPJr"

Downloading...
From: https://drive.google.com/uc?id=1dTaKofC9ZcMWa5cVtGLDFkEnbc4hiPJr
To: /content/dataset-cybertrolls.json
100% 2.76M/2.76M [00:00<00:00, 225MB/s]


Ahora se crea el directorio dónde vas a desarrollar esta primera parte de la práctica.

In [6]:
%mkdir /content/batch

mkdir: cannot create directory ‘/content/batch’: File exists


Se establece el directorio de trabajo que hemos creado.

In [7]:
import os

# Set the working directory to the sample code directory
%cd /content/batch

WORK_DIR = os.getcwd()

/content/batch


Ahora se descargarán los datos en el workspace de Colab para trabajar en local.

In [8]:
! gdown "1dTaKofC9ZcMWa5cVtGLDFkEnbc4hiPJr"
! mv dataset-cybertrolls.json dataset.json

Downloading...
From: https://drive.google.com/uc?id=1dTaKofC9ZcMWa5cVtGLDFkEnbc4hiPJr
To: /content/batch/dataset-cybertrolls.json
  0% 0.00/2.76M [00:00<?, ?B/s]100% 2.76M/2.76M [00:00<00:00, 201MB/s]


Se establecen las dependencias que se usarán en la práctica. Se pueden añadir y quitar las dependencias que no se usen o viceversa.

In [9]:
%%writefile requirements.txt

apache-beam[gcp]
tensorflow
gensim
fsspec
gcsfs
numpy

Overwriting requirements.txt


Instalamos las dependencias. **No olvides reiniciar el entorno al instalar y establecer las variables y credenciales de GCP al arrancar.**

In [10]:
! pip install -r requirements.txt



## Primer ejercicio

Desarrollar un pipeline de preprocesamiento utilizando Apache Beam para generar datos de train, eval y test para los datos proporcionados anteriormente. Requisitos:

- Proporcionar dos modos de ejecución: `train` y `test`
- Soportar ejecuciones en local con `DirectRunner` y ejecuciones en Dataflow usando `DataFlowRunner`.

In [11]:
%%writefile preprocess.py

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A word-counting workflow."""

# pytype: skip-file

from __future__ import absolute_import


import argparse
import logging
import re
import os
import csv
import random
import json
from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText #fichero de texto a colección
from apache_beam.io import WriteToText #colección a fichero de texto
from apache_beam.coders.coders import Coder #gestión de codificación de archivos
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions, DirectOptions

import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer

nltk.download("stopwords")

# CLEANING
STOP_WORDS = stopwords.words("english") #stopwords en inglés
STEMMER = SnowballStemmer("english") #stemmer en inglés
TEXT_CLEANING_RE = "@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+" #Expresión regular para limpiar urls


class CustomCoder(Coder):
    """Custom coder utilizado para ller y escribir strings. Realiza una serie de tranformaciones entre codificaciones"""

    def __init__(self, encoding: str):
        # latin-1
        # iso-8859-1
        self.enconding = encoding

    def encode(self, value):
        return value.encode(self.enconding)

    def decode(self, value):
        return value.decode(self.enconding)

    def is_deterministic(self):
        return True

class ExtractColumnsDoFn(beam.DoFn):    #Extrae información de las columnas
    def process(self, element):
        # space removal
        element_json = json.loads(element)
        # Extract relevant information

        text = element_json["content"]
        sentiment = element_json["annotation"]["label"][0]
        # Emit the extracted information as a tuple
        yield (text, sentiment)


class PreprocessColumnsTrainFn(beam.DoFn): #realizado todo el preprocesamiento propio de NLP
    def process_sentiment(self, sentiment):  #preprocesa la parte de sentimiento
        sentiment = int(sentiment)
        if sentiment == 1:
            return "POSITIVE"
        elif sentiment == 0:
            return "NEGATIVE"
        else:
            return "NEUTRAL"

    def process_text(self, text): #preprocesa la parte de texto
        # Elimina link,user y caracteres especiales
        stem = False
        text = re.sub(TEXT_CLEANING_RE, " ", str(text).lower()).strip() #limpiar URL
        tokens = []
        for token in text.split():
            if token not in STOP_WORDS:
                if stem:
                    tokens.append(STEMMER.stem(token)) #stemmiza
                else:
                    tokens.append(token)
        return " ".join(tokens) #devuelve el resultado

    def process(self, element):
        processed_text = self.process_text(element[0]) #procesa texto
        processed_sentiment = self.process_sentiment(element[1]) #procesa sentimiento
        yield f"{processed_text}, {processed_sentiment}"






def run(argv=None, save_main_session=True):

  """Main entry point; defines and runs the wordcount pipeline."""



  parser = argparse.ArgumentParser() #instanciamos el parseador de argumentos

  parser.add_argument( #añado argumento y lo guardo en la variable work_dir. Directorio de trabajo
        "--work-dir", dest="work_dir", required=True, help="Working directory",
  )

  parser.add_argument( # añado argumento. Fichero de entrada
        "--input", dest="input", required=True, help="Input dataset in work dir",
    )
  parser.add_argument( # añado argumento. Fichero de salida
        "--output",
        dest="output",
        required=True,
        help="Output path to store transformed data in work dir",
    )
  parser.add_argument( # añado argumento. Indica si estamos entrenando o evaluando
        "--mode",
        dest="mode",
        required=True,
        choices=["train", "test"],
        help="Type of output to store transformed data",
    )

  known_args, pipeline_args = parser.parse_known_args(argv) #recoge los argumentos enviados por consola y los almacena

  # Añadimos configuración de la pipeline
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session #permite usar dependencias externas como nltk
  pipeline_options.view_as(DirectOptions).direct_num_workers = 0 #número de workers. 0 para que coja el número máximo que tiene la máquina

  # Hasta aquí hemos añadido la configuración, a partir de ahora comenzamos a construir la pipeline:
  with beam.Pipeline(options=pipeline_options) as p:
    # Read the text file[pattern] into a PCollection.
    raw_data = p | "ReadTwitterData" >> ReadFromText(
            known_args.input, coder=CustomCoder("latin-1")) #leemos el texto y tratamos la codificación. Hemos dado un nombre a la transformación

    if known_args.mode == "train":

      transformed_data = ( #construimos datos transformados como la aplicación al raw_data de nuestras dos transformacione
                raw_data
                | "ExtractColumns" >> beam.ParDo(ExtractColumnsDoFn()) #Extreamos información de columna
                | "Preprocess" >> beam.ParDo(PreprocessColumnsTrainFn()) #Preprocesamos la información
            )
      #Vamos a construir un conjunto de validación
      eval_percent = 20
      assert 0 < eval_percent < 100, "eval_percent must in the range (0-100)"
      train_dataset, eval_dataset = (
                transformed_data
                | "Split dataset"
                >> beam.Partition( #particionamos nuestros datos cumpliendo
                    lambda elem, _: int(random.uniform(0, 100) < eval_percent), 2
                ) # el 2 es el número de particiones
            )

      train_dataset | "TrainWriteToCSV" >> WriteToText( # escribimos el conjunto de train en un csv
                os.path.join(known_args.output, "train", "part") #part genera las distintas particiones para cada fichero. Es el prefijo del fichero.
            )
      eval_dataset | "EvalWriteToCSV" >> WriteToText( # escribimos el conjunto de eval en un csv
                os.path.join(known_args.output, "eval", "part") #part genera las distintas particiones para cada fichero. Es el prefijo del fichero.
            )

    else:
      transformed_data = (
                raw_data
                | "ExtractColumns" >> beam.ParDo(ExtractColumnsDoFn()) #De nuevo extraemos la información de columnas
                | "Preprocess" >> beam.Map(lambda x: f'"{x[0]}"') # En este caso solo nos quedamos con la columna de texto sin preprocesar.
            )

      transformed_data | "TestWriteToCSV" >> WriteToText(
                os.path.join(known_args.output, "test", "part") #escribimos de nuevo a un fichero de texto.
            )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()



Writing preprocess.py


Se proporciona un fichero `setup.py` necesario para ejecutar en DataFlow. Modificar la variable `REQUIRED_PACKAGES` con las dependencias que se hayan usado en el `requirements.txt`

In [12]:
%%writefile setup.py

import setuptools

REQUIRED_PACKAGES = [
  "apache-beam[gcp]",
  "tensorflow",
  "gensim",
  "fsspec",
  "gcsfs",
  "numpy"
]

setuptools.setup(
    name="twitchstreaming",
    version="0.0.1",
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True,
    description="Troll detection",
)


Writing setup.py


Me creo una copia por si hubiera algún error en el procesamiento (buena práctica):

In [13]:
! gdown "1dTaKofC9ZcMWa5cVtGLDFkEnbc4hiPJr"
! gsutil cp dataset-cybertrolls.json //$WORK_DIR/data.json


Downloading...
From: https://drive.google.com/uc?id=1dTaKofC9ZcMWa5cVtGLDFkEnbc4hiPJr
To: /content/batch/dataset-cybertrolls.json
100% 2.76M/2.76M [00:00<00:00, 58.2MB/s]
Copying file://dataset-cybertrolls.json...
/ [1 files][  2.6 MiB/  2.6 MiB]                                                
Operation completed over 1 objects/2.6 MiB.                                      


### Validación preprocess train en local
Con el comando mostrado a continuación se valida la correcta generación de los datos de entrenamiento y validación en local.

In [14]:
! python3 preprocess.py \
  --work-dir $WORK_DIR \
  --runner DirectRunner \
  --input $WORK_DIR/data.json \
  --output $WORK_DIR/transformed_data \
  --mode train

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7cc2bae1d1e0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7cc2bae1ed10> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 2 (skipped: 0), batches: 2, num_threads: 2
INFO:apache_bea

### Validación preprocess test en local

Con el comando mostrado a continuación se valida la correcta generación de los datos de test en local.

In [15]:
! python3 preprocess.py \
  --work-dir $WORK_DIR \
  --runner DirectRunner \
  --input $WORK_DIR/data.json \
  --output $WORK_DIR/transformed_data \
  --mode test

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7c0b163e7760> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7c0b16066ef0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 2 (skipped: 0), batches: 2, num_threads: 2
INFO:ap

## Segundo ejercicio

Desarrollar una tarea de entrenamiento para los datos preprocesados. Requisitos:

- Soportar ejecuciones en local usando el SDK de AI-Platform y ejecuciones en GCP con el mismo código.

Se crea el directorio donde trabajaremos:

In [16]:
%mkdir /content/batch/trainer

In [17]:
%%writefile trainer/__init__.py

version = "0.1.0"

Writing trainer/__init__.py


In [18]:
%%writefile trainer/task.py
from __future__ import absolute_import

import argparse
import multiprocessing as mp
import logging
import tempfile
import os

import pickle
import gensim
import pandas as pd
import numpy as np
import tensorflow as tf

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    Dense,
    Dropout,
    Embedding,
    LSTM,
)
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from sklearn.preprocessing import LabelEncoder


# WORD2VEC
W2V_SIZE = 300 #tamaño del embedding
W2V_WINDOW = 7 #tamaño de la ventana
# 32
W2V_EPOCH = 5 #epochs para entrrenar
W2V_MIN_COUNT = 10 #se eliminan palabras que aparecen menos de 10 veces

# KERAS
SEQUENCE_LENGTH = 300 #longitud de secuencia para el padding

# SENTIMENT definimos las etiquetas
POSITIVE = "POSITIVE"
NEGATIVE = "NEGATIVE"
NEUTRAL = "NEUTRAL"
SENTIMENT_THRESHOLDS = (0.4, 0.7) #todas las probabilidades entre 0.4 y 0.7 se asignan a neutral

# EXPORT
KERAS_MODEL = "model.h5" #nombre del archivo del modelo
WORD2VEC_MODEL = "model.w2v" #nombre del archivo del embedding
TOKENIZER_MODEL = "tokenizer.pkl" #nombre del tokenizador
ENCODER_MODEL = "encoder.pkl" #nombre del encoder


def generate_word2vec(train_df):
  ### Genera el embedding y lo entrena
    documents = [_text.split() for _text in train_df.text.values]
    w2v_model = gensim.models.word2vec.Word2Vec(
        vector_size=W2V_SIZE,
        window=W2V_WINDOW,
        min_count=W2V_MIN_COUNT,
        workers=mp.cpu_count(),
    )
    w2v_model.build_vocab(documents)

    words = w2v_model.wv.index_to_key
    vocab_size = len(words)
    logging.info(f"Vocab size: {vocab_size}")
    w2v_model.train(documents, total_examples=len(documents), epochs=W2V_EPOCH)

    return w2v_model


def generate_tokenizer(train_df):
  ### Genera nuestro tokenizador
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(train_df.text)
    vocab_size = len(tokenizer.word_index) + 1
    logging.info(f"Total words: {vocab_size}")
    return tokenizer, vocab_size


def generate_label_encoder(train_df):
  ### COdifica las etiquetas (de texto a número)
    encoder = LabelEncoder()
    encoder.fit(train_df.sentiment.tolist())
    return encoder


def generate_embedding(word2vec_model, vocab_size, tokenizer):
  ### Genera el embedding en base al Word2Vec
    embedding_matrix = np.zeros((vocab_size, W2V_SIZE))
    for word, i in tokenizer.word_index.items():
        if word in word2vec_model.wv:
            embedding_matrix[i] = word2vec_model.wv[word]
    return Embedding(
        vocab_size,
        W2V_SIZE,
        weights=[embedding_matrix],
        input_length=SEQUENCE_LENGTH,
        trainable=False,
    )


def train_and_evaluate(
    work_dir, train_df, eval_df, batch_size=1024, epochs=8, steps=1000
):

    """
    Trains and evaluates the estimator given.
    The input functions are generated by the preprocessing function.
    """

    model_dir = os.path.join(work_dir, "data/model") # Comprobamos si ya existe un modelo

    parent_dir = os.path.dirname(model_dir)
    if not tf.io.gfile.exists(parent_dir):
        tf.io.gfile.makedirs(parent_dir)

    # Check if the model directory exists and remove it if it does
    if tf.io.gfile.exists(model_dir):
        tf.io.gfile.rmtree(model_dir)

    # Create the model directory
    tf.io.gfile.mkdir(model_dir)


    # Configuramos donde guardar el modelo
    run_config = tf.estimator.RunConfig()
    run_config = run_config.replace(model_dir=model_dir)

    # Nos permite seguir el entrenamiento cada 10 steps
    run_config = run_config.replace(save_summary_steps=10)

    # Generamos el Word2Vec para entrenamiento
    logging.info("---- Generating word2vec model ----")
    word2vec_model = generate_word2vec(train_df)

    # Generamos el tokenizador con el dato de entrenamiento e inferimos en el de entrenamiento y evaluación
    logging.info("---- Generating tokenizer ----")
    tokenizer, vocab_size = generate_tokenizer(train_df)

    logging.info("---- Tokenizing train data ----")
    x_train = pad_sequences(
        tokenizer.texts_to_sequences(train_df.text), maxlen=SEQUENCE_LENGTH
    ) #Añadimos el padding
    logging.info("---- Tokenizing eval data ----")
    x_eval = pad_sequences(
        tokenizer.texts_to_sequences(eval_df.text), maxlen=SEQUENCE_LENGTH
    )

    # Generamos los encodings de las etiquetas tanto para entrenamiento como para evaluación
    logging.info("---- Generating label encoder ----")
    label_encoder = generate_label_encoder(train_df)

    logging.info("---- Encoding train target ----")
    y_train = label_encoder.transform(train_df.sentiment.tolist())
    logging.info("---- Encoding eval target ----")
    y_eval = label_encoder.transform(eval_df.sentiment.tolist())

    y_train = y_train.reshape(-1, 1) #adaptamos la forma del tensor
    y_eval = y_eval.reshape(-1, 1)

    # Create Embedding Layer
    logging.info("---- Generating embedding layer ----")
    embedding_layer = generate_embedding(word2vec_model, vocab_size, tokenizer) #capade embedding
    # Construimos nuestra red LSTM
    logging.info("---- Generating Sequential model ----")
    model = Sequential()
    model.add(embedding_layer)
    model.add(Dropout(0.5))
    model.add(LSTM(100, dropout=0.2, recurrent_dropout=0.2))
    model.add(Dense(1, activation="sigmoid"))

    model.summary()

    logging.info("---- Adding loss function to model ----")
    model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"]) #problema binario y medimos según el accuracy

    logging.info("---- Adding callbacks to model ----")
    callbacks = [
        ReduceLROnPlateau(monitor="val_loss", patience=5, cooldown=0),
        EarlyStopping(monitor="val_accuracy", min_delta=1e-4, patience=5), #early stopping para agilizar
    ]

    logging.info("---- Training model ----")
    model.fit(
        x_train,
        y_train,
        batch_size=batch_size,
        steps_per_epoch=steps,
        epochs=epochs,
        validation_split=0.1,
        verbose=1,
        callbacks=callbacks,
    ) #entrenamos nuestro modelo

    logging.info("---- Evaluating model ----")
    score = model.evaluate(x_eval, y_eval, batch_size=batch_size) #evaliamos el modelo
    logging.info(f"ACCURACY: {score[1]}")
    logging.info(f"LOSS: {score[0]}")

    logging.info("---- Saving models ----")
    pickle.dump(
        tokenizer,
        tf.io.gfile.GFile(os.path.join(model_dir, TOKENIZER_MODEL), mode="wb"),
        protocol=0,
    ) #guardamos el tokenizador
    with tempfile.NamedTemporaryFile(suffix=".h5") as local_file:
        with tf.io.gfile.GFile(
            os.path.join(model_dir, KERAS_MODEL), mode="wb" #guardamos el archivo
        ) as gcs_file:
            model.save(local_file.name)
            gcs_file.write(local_file.read())

    # word2vec_model.save(os.path.join(model_dir, WORD2VEC_MODEL))

    # pickle.dump(
    #     label_encoder, open(os.path.join(model_dir, ENCODER_MODEL), "wb"), protocol=0
    # )


if __name__ == "__main__":

    """Main function called by AI Platform."""

    logging.getLogger().setLevel(logging.INFO) # Activamos el logger

    parser = argparse.ArgumentParser( #Implementamos el parseador de argumentos. Ver script de preprocesamiento para más detalle
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )

    parser.add_argument(
        "--job-dir",
        help="Directory for staging trainer files. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument(
        "--work-dir",
        required=True,
        help="Directory for staging and working files. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument(
        "--batch-size",
        type=int,
        default=1024,
        help="Batch size for training and evaluation.",
    )

    parser.add_argument(
        "--epochs", type=int, default=8, help="Number of epochs to train the model",
    )

    parser.add_argument(
        "--steps",
        type=int,
        default=1000,
        help="Number of steps per epoch to train the model",
    )

    args = parser.parse_args() #cargamos todos los argumentos antes mencionados

    train_data_files = tf.io.gfile.glob(
        os.path.join(args.work_dir, "transformed_data/train/part-*") # el * permite que lea todas las particiones
    ) #archivos de entrenamiento
    eval_data_files = tf.io.gfile.glob(
        os.path.join(args.work_dir, "transformed_data/eval/part-*")
    ) #archivos de test

    train_df = pd.concat(
        [
            pd.read_csv(
                f,
                names=["text", "sentiment"],
                dtype={"text": "string", "sentiment": "string"},
            ) # leemos cada csv
            for f in train_data_files
        ]
    ).dropna() #generamos un dataframe con todas las particiones

    eval_df = pd.concat(
        [
            pd.read_csv(
                f,
                names=["text", "sentiment"],
                dtype={"text": "string", "sentiment": "string"},
            )
            for f in eval_data_files
        ]
    ).dropna()


    train_and_evaluate(
        args.work_dir,
        train_df=train_df,
        eval_df=eval_df,
        batch_size=args.batch_size,
        epochs=args.epochs,
        steps=args.steps,
    )

Writing trainer/task.py


### Validación Train en local

Con el comando mostrado a continuación se valida el correcto entrenamiento del modelo usando los datos preprocesados del apartado anterior.

In [19]:
# Explicitly tell `gcloud ai-platform local train` to use Python 3
! gcloud config set ml_engine/local_python $(which python3)

# This is similar to `python -m trainer.task --job-dir local-training-output`
# but it better replicates the AI Platform environment, especially for
# distributed training (not applicable here).
! gcloud ai-platform local train \
  --package-path trainer \
  --module-name trainer.task \
  -- \
  --work-dir $WORK_DIR \
  --epochs 1

Updated property [ml_engine/local_python].
2023-09-17 15:32:27.805840: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
Instructions for updating:
Use tf.keras instead.
INFO:tensorflow:TF_CONFIG environment variable: {'job': {'job_name': 'trainer.task', 'args': ['--work-dir', '/content/batch', '--epochs', '1']}, 'task': {}, 'cluster': {}, 'environment': 'cloud'}
INFO:root:---- Generating word2vec model ----
INFO:gensim.utils:Word2Vec lifecycle event {'params': 'Word2Vec<vocab=0, vector_size=300, alpha=0.025>', 'datetime': '2023-09-17T15:32:30.079691', 'gensim': '4.3.2', 'python': '3.10.12 (main, Jun 11 2023, 05:26:28) [GCC 11.4.0]', 'platform': 'Linux-5.15.109+-x86_64-with-glibc2.35', 'event': 'created'}
INFO:gensim.models.word2vec:collecting all 

## Tercer ejercicio

Desarrollar un pipeline de inferencia utilizando Apache Beam para generar predicciones usando los modelos generados en el apartado anterior así como los datos de test generados en el primer ejercicio.


In [20]:
%%writefile predict.py


from __future__ import absolute_import
from __future__ import print_function

import argparse
import tempfile
import json
import os
import sys
import time

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.coders.coders import Coder

import pickle
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences

# KERAS
SEQUENCE_LENGTH = 300

# SENTIMENT
POSITIVE = "TROLL"
NEGATIVE = "NO TROLL"
#NEUTRAL = "NEUTRAL"
#SENTIMENT_THRESHOLDS = (0.4, 0.7)

# EXPORT
KERAS_MODEL = "model.h5"
TOKENIZER_MODEL = "tokenizer.pkl"


class Predict(beam.DoFn): #realiza predicciones y hereda de Beam
    def __init__(
        self, model_dir,
    ):
        self.model_dir = model_dir
        self.model = None
        self.tokenizer = None

    def setup(self): #es un método de apache Beam que se ejecuta la primera vez que se ejecute e inicializa el modelo de Keras y el tokenizador cargándolos
        keras_model_path = os.path.join(self.model_dir, KERAS_MODEL)
        with tempfile.NamedTemporaryFile(suffix=".h5") as local_file:
            with tf.io.gfile.GFile(keras_model_path, mode="rb") as gcs_file:
                local_file.write(gcs_file.read())
                self.model = tf.keras.models.load_model(local_file.name)

        tokenizer_path = os.path.join(self.model_dir, TOKENIZER_MODEL)
        self.tokenizer = pickle.load(tf.io.gfile.GFile(tokenizer_path, mode="rb"))

    def decode_sentiment(self, score, include_neutral= False):

        return NEGATIVE if score < 0.5 else POSITIVE

    def process(self, element): #recoge elt exto a predecir, tokeniza
        start_at = time.time()
        # Tokenize text
        x_test = pad_sequences(
            self.tokenizer.texts_to_sequences([element]), maxlen=SEQUENCE_LENGTH
        ) #tokenizado y padding
        # Predict
        score = self.model.predict([x_test])[0] #realiza la inferencia
        # Decode sentiment
        label = self.decode_sentiment(score) #trasnforma la predicción en sentimiento

        yield {
            "text": element,
            "label": label,
            "score": float(score),
            "elapsed_time": time.time() - start_at,
        } #devuelve un diccionario con la información clave


class CustomCoder(Coder):
    """A custom coder used for reading and writing strings"""

    def __init__(self, encoding: str):
        # latin-1
        # iso-8859-1
        self.enconding = encoding

    def encode(self, value):
        return value.encode(self.enconding)

    def decode(self, value):
        return value.decode(self.enconding)

    def is_deterministic(self):
        return True


def run(model_dir, source, sink, beam_options=None):
    with beam.Pipeline(options=beam_options) as p:
        _ = (
            p #generamos el pipeline
            | "Read data" >> source #lectura de datos
            | "Predict" >> beam.ParDo(Predict(model_dir)) #hacemos la predicción
            | "Format as JSON" >> beam.Map(json.dumps) #convertimos el diccionario en string
            | "Write predictions" >> sink #guardamos la información
        )


if __name__ == "__main__":
    """Main function"""
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter # parseador de argumentos
    )

    parser.add_argument( #directorio de trabajo
        "--work-dir",
        dest="work_dir",
        required=True,
        help="Directory for temporary files and preprocessed datasets to. "
        "This can be a Google Cloud Storage path.",
    )

    parser.add_argument( #ubicación del modelo para inferencia
        "--model-dir",
        dest="model_dir",
        required=True,
        help="Path to the exported TensorFlow model. "
        "This can be a Google Cloud Storage path.",
    )

    verbs = parser.add_subparsers(dest="verb")
    batch_verb = verbs.add_parser("batch", help="Batch prediction")
    batch_verb.add_argument( #datos de entrada para la predicción
        "--inputs-dir",
        dest="inputs_dir",
        required=True,
        help="Input directory where CSV data files are read from. "
        "This can be a Google Cloud Storage path.",
    )
    batch_verb.add_argument( #datos de salida para la predicción
        "--outputs-dir",
        dest="outputs_dir",
        required=True,
        help="Directory to store prediction results. "
        "This can be a Google Cloud Storage path.",
    )

    args, pipeline_args = parser.parse_known_args()
    print(args)
    beam_options = PipelineOptions(pipeline_args)
    beam_options.view_as(SetupOptions).save_main_session = True
    # beam_options.view_as(DirectOptions).direct_num_workers = 0

    project = beam_options.view_as(GoogleCloudOptions).project

    if args.verb == "batch": #si es batch genera transformadores iniciales, la fuente y la salida de datos
        results_prefix = os.path.join(args.outputs_dir, "part")

        source = ReadFromText(args.inputs_dir, coder=CustomCoder("latin-1"))
        sink = WriteToText(results_prefix)

    else: # de momento no funciona si no es batch
        parser.print_usage()
        sys.exit(1)

    run(args.model_dir, source, sink, beam_options)


Writing predict.py


Generamos un timestamp para la ejecución de las predicciones

In [21]:
from datetime import datetime

# current date and time
TIMESTAMP = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

### Validación Predict en local

Con el comando mostrado a continuación se valida la correcta inferencia usando los modelos anteriores y los datos de test generados anteriormente.

In [22]:
! python3 predict.py \
  --work-dir $WORK_DIR \
  --model-dir $WORK_DIR/data/model \
  batch \
  --inputs-dir $WORK_DIR/transformed_data/test/part* \
  --outputs-dir $WORK_DIR/predictions/$TIMESTAMP

2023-09-17 15:34:49.940588: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
Namespace(work_dir='/content/batch', model_dir='/content/batch/data/model', verb='batch', inputs_dir='/content/batch/transformed_data/test/part-00000-of-00002', outputs_dir='/content/batch/predictions/2023-09-17_15-34-41')
2023-09-17 15:34:52.807201: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-09-17 15:34:52.838808: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA 

## Extra. Comprobaciones en la nube

En esta última parte se validará el funcionamiento del código en un proyecto de GCP sobre DataFlow y AI Platform

Me copio los datos necesarios a Google Cloud:

In [None]:
!ls

In [None]:
! gsutil cp -r model gs://$BUCKET_NAME

In [None]:
! gsutil cp -r transformed_data gs://$BUCKET_NAME

Establecemos el bucket y region de GCP sobre el que trabajaremos:

In [None]:
GCP_WORK_DIR = 'gs://' + BUCKET_NAME
GCP_REGION = "europe-west1"

### Validación preprocess train en Dataflow

Con el comando mostrado a continuación se valida la correcta generación de los datos de entrenamiento y validación en GCP con el servicio DataFlow.

**Nota.** Recuerda detener la celda una vez el trabajo te aparezca ejecutando en DataFlow.

In [None]:
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $GCP_WORK_DIR \
  --input $GCP_WORK_DIR/data.json \
  --output $GCP_WORK_DIR/transformed_data \
  --mode train

### Validación preprocess test en Dataflow

Con el comando mostrado a continuación se valida la correcta generación de los datos de test en GCP con el servicio DataFlow.

**Nota.** Recuerda detener la celda una vez el trabajo te aparezca ejecutando en DataFlow.

In [None]:
! python3 preprocess.py \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --work-dir $GCP_WORK_DIR \
  --input $GCP_WORK_DIR/data.json \
  --output $GCP_WORK_DIR/transformed_data \
  --mode test

Omitimos el entrenamiento en la nube para no incurrir en costes innecesarios.

### Validación predict en Dataflow

Con el comando mostrado a continuación se valida la predicción correcta de los datos de test usando los modelos generados en el comando anterior.

Generamos un timestamp para el almacenamiento de las inferencias en Google Cloud Storage.

In [None]:
from datetime import datetime

# current date and time
TIMESTAMP = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

**Nota.** Recuerda detener la celda una vez el trabajo te aparezca ejecutando en DataFlow.

In [None]:
# For using sample models: --model-dir gs://$BUCKET_NAME/models/
! python3 predict.py \
  --work-dir $GCP_WORK_DIR \
  --model-dir $GCP_WORK_DIR/model/ \
  batch \
  --project $PROJECT_ID \
  --region $GCP_REGION \
  --runner DataflowRunner \
  --temp_location $GCP_WORK_DIR/beam-temp \
  --setup_file ./setup.py \
  --inputs-dir $GCP_WORK_DIR/transformed_data/test/part* \
  --outputs-dir $GCP_WORK_DIR/predictions/$TIMESTAMP

# Mensaje final

¡Muchas gracias por participar en este curso, espero que tanto las sesiones teóricas como la práctica te hayan resultado útiles. A lo largo de esta semmana iréis recibiendo feedback personalizado sobre vuestras prácticas.

¡Muchas gracias y ánimo con el proyecto final!