# Extracción de Texto con BERT

Integrantes del equipo: 

*   Rodrigo Salinas 124825
*   Oscar Ruiz 195238
*   Viridiana Marlen González Flores 192460
*   Victor Rivera 97105



# Código y Comentarios

##Configuración

In [None]:
!pip install tokenizers
!pip install transformers

In [None]:
# Parámetros y configuraciones iniciales para BERT
import os # el módulo provee funciones para interactuar con el Sistema Operativo.
import re # el módulo provee herramientas de expresiones regulares para procesamiento en cadenas.
import json # el módulo permite trabajar con archivos JSON. 
import string # conserva varias constantes útiles y clases para trabajar con objetos str.
import numpy as np # Numpy es un paquete que provee a Python con arreglos multidimensionales de alta eficiencia y diseñados para cálculo científico.
import tensorflow as tf # TensorFlow facilita la creación de modelos de aprendizaje automático.
from tensorflow import keras # soporta la libreria TensorFlow.
from tensorflow.keras import layers # se importan las capas de keras.
from tokenizers import BertWordPieceTokenizer # tokenización de subpalabras.
from transformers import BertTokenizer, TFBertModel, BertConfig # configuraciones para BERT. 
from sklearn.model_selection import train_test_split

max_len = 384 #Tamaño máximo de capas
configuration = BertConfig()  # default parameters and configuration for BERT


## Configuración del tokenizador BERT

In [None]:
# Guarda el token preentrenado
slow_tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
save_path = "bert_base_uncased/"
if not os.path.exists(save_path):
    os.makedirs(save_path)
slow_tokenizer.save_pretrained(save_path)

# Carga el token desde el archivo previamente guardado
tokenizer = BertWordPieceTokenizer("bert_base_uncased/vocab.txt", lowercase=True)


##Carga de datos de entrenamiento en inglés

In [None]:
train_data_url = "https://rajpurkar.github.io/SQuAD-explorer/dataset/train-v1.1.json"
train_path = keras.utils.get_file("train.json", train_data_url)


## Carga de datos de evaluación en español


In [None]:
all_data_url = "https://github.com/deepmind/xquad/raw/master/xquad.es.json" #Carga los datos de entrenamiento en español
all_path = keras.utils.get_file("train.json", all_data_url)

## Carga de datos de evaluación en inglés

In [None]:
eval_data_url = "https://rajpurkar.github.io/SQuAD-explorer/dataset/dev-v1.1.json" #Carga los datos de prueba en inglés
eval_path = keras.utils.get_file("eval.json", eval_data_url)

Nota: si se quiere evaluar en modelo con los datos de evaluación en inglés, deshabilitar la carga de datos en español y habilitar la carga de datos en inglés, así mismo, hacer el cambio en los artículos en la función normalize_text

## Preprocesamiento de los datos
1. Recorre el archivo JSON y almacena cada registro como un objeto SquadExample.
2. Revisa cada SquadExample y "create x_train", "y_train", "x_eval", "y_eval".

In [None]:
class SquadExample:
    def __init__(self, question, context, start_char_idx, answer_text, all_answers):
        self.question = question
        self.context = context
        self.start_char_idx = start_char_idx
        self.answer_text = answer_text
        self.all_answers = all_answers
        self.skip = False

    def preprocess(self):
        context = self.context
        question = self.question
        answer_text = self.answer_text
        start_char_idx = self.start_char_idx

        # Limpiar contexto, respuesta and pregunta
        context = " ".join(str(context).split())
        question = " ".join(str(question).split())
        answer = " ".join(str(answer_text).split())

        # Encontrar el índice de último carácter de la respuesta en el contexto
        end_char_idx = start_char_idx + len(answer)
        if end_char_idx >= len(context):
            self.skip = True
            return

        # Marcar los índices de los caracteres en el contexto que están en la respuesta
        is_char_in_ans = [0] * len(context)
        for idx in range(start_char_idx, end_char_idx):
            is_char_in_ans[idx] = 1

        # Tokenizar contexto
        tokenized_context = tokenizer.encode(context)

        # Encontrar tokens que fueron creados de los caracteres de la respuesta
        ans_token_idx = []
        for idx, (start, end) in enumerate(tokenized_context.offsets):
            if sum(is_char_in_ans[start:end]) > 0:
                ans_token_idx.append(idx)

        if len(ans_token_idx) == 0:
            self.skip = True
            return

        # Encontrar los índices de inicio y fin de los tokens en la respuesta
        start_token_idx = ans_token_idx[0]
        end_token_idx = ans_token_idx[-1]

        # Tokenizar pregunta
        tokenized_question = tokenizer.encode(question)

        # Crear entradas
        input_ids = tokenized_context.ids + tokenized_question.ids[1:]
        token_type_ids = [0] * len(tokenized_context.ids) + [1] * len(
            tokenized_question.ids[1:]
        )
        attention_mask = [1] * len(input_ids)

        # Rellenar y crear máscaras de atención.
        # Saltar si es necesario truncar
        padding_length = max_len - len(input_ids)
        if padding_length > 0:  # rellenar
            input_ids = input_ids + ([0] * padding_length)
            attention_mask = attention_mask + ([0] * padding_length)
            token_type_ids = token_type_ids + ([0] * padding_length)
        elif padding_length < 0:  # saltar
            self.skip = True
            return

        self.input_ids = input_ids
        self.token_type_ids = token_type_ids
        self.attention_mask = attention_mask
        self.start_token_idx = start_token_idx
        self.end_token_idx = end_token_idx
        self.context_token_to_char = tokenized_context.offsets


with open(train_path) as f:
    raw_train_data = json.load(f)

with open(eval_path) as f:
    raw_eval_data = json.load(f)


def create_squad_examples(raw_data):
    squad_examples = []
    for item in raw_data["data"]:
        for para in item["paragraphs"]:
            context = para["context"]
            for qa in para["qas"]:
                question = qa["question"]
                answer_text = qa["answers"][0]["text"]
                all_answers = [_["text"] for _ in qa["answers"]]
                start_char_idx = qa["answers"][0]["answer_start"]
                squad_eg = SquadExample(
                    question, context, start_char_idx, answer_text, all_answers
                )
                squad_eg.preprocess()
                squad_examples.append(squad_eg)
    return squad_examples


def create_inputs_targets(squad_examples):
    dataset_dict = {
        "input_ids": [],
        "token_type_ids": [],
        "attention_mask": [],
        "start_token_idx": [],
        "end_token_idx": [],
    }
    for item in squad_examples:
        if item.skip == False:
            for key in dataset_dict:
                dataset_dict[key].append(getattr(item, key))
    for key in dataset_dict:
        dataset_dict[key] = np.array(dataset_dict[key])

    x = [
        dataset_dict["input_ids"],
        dataset_dict["token_type_ids"],
        dataset_dict["attention_mask"],
    ]
    y = [dataset_dict["start_token_idx"], dataset_dict["end_token_idx"]]
    return x, y


train_squad_examples = create_squad_examples(raw_train_data)
x_train, y_train = create_inputs_targets(train_squad_examples)
print(f"{len(train_squad_examples)} training points created.")

eval_squad_examples = create_squad_examples(raw_eval_data)
x_eval, y_eval = create_inputs_targets(eval_squad_examples)
print(f"{len(eval_squad_examples)} evaluation points created.")

##Creación del modelo de respuesta a preguntas mediante BERT y API funcional

In [None]:
def create_model():
    ## Codificador BERT
    encoder = TFBertModel.from_pretrained("bert-base-uncased")

    ## Modelo de preguntas y respuestas
    input_ids = layers.Input(shape=(max_len,), dtype=tf.int32)
    token_type_ids = layers.Input(shape=(max_len,), dtype=tf.int32)
    attention_mask = layers.Input(shape=(max_len,), dtype=tf.int32)
    embedding = encoder(
        input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask)[0]
    ## Generación de probabilidades
    start_logits = layers.Dense(1, name="start_logit", use_bias=False)(embedding)
    start_logits = layers.Flatten()(start_logits)

    end_logits = layers.Dense(1, name="end_logit", use_bias=False)(embedding)
    end_logits = layers.Flatten()(end_logits)
    ## Función de activación de las capas
    start_probs = layers.Activation(keras.activations.softmax)(start_logits)
    end_probs = layers.Activation(keras.activations.softmax)(end_logits)

    model = keras.Model(
        inputs=[input_ids, token_type_ids, attention_mask],
        outputs=[start_probs, end_probs],
    )
    ## Función de pérdida de entropía cruzada 
    loss = keras.losses.SparseCategoricalCrossentropy(from_logits=False)
    # Esta es la tasa de aprendizaje, según el paper, los valores óptimos son:
    # 5e-5, 3r-5, 2e-5
    optimizer = keras.optimizers.Adam(lr=5e-5)
    model.compile(optimizer=optimizer, loss=[loss, loss])
    return model


##Este código debe ejecutarse preferentemente en el tiempo de ejecución de Google Colab TPU.
Con los TPUs de Colab, cada época tardará entre 5 y 6 minutos.

In [None]:
use_tpu = True
if use_tpu:
    # Se crea una estrategia de distribución
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)

    # Creación del modelo
    with strategy.scope():
        model = create_model()
else:
    model = create_model()

model.summary()

##Evaluación de "Callback"

Esta llamada de retorno calculará la puntuación de coincidencia exacta utilizando los datos de validación después de cada época.

In [None]:

def normalize_text(text):
    text = text.lower()

    # Elimina los signos de puntuación
    exclude = set(string.punctuation)
    text = "".join(ch for ch in text if ch not in exclude)

    # Eliminamos los artículos:
    #Español
    regex = re.compile(r"\b(el|la|los|las)\b", re.UNICODE)
    text = re.sub(regex, " ", text)

    #Inglés
    regex = re.compile(r"\b(a|an|the)\b", re.UNICODE)
    text = re.sub(regex, " ", text)

    # Elimina los espacios en blanco
    text = " ".join(text.split())
    return text


class ExactMatch(keras.callbacks.Callback):
    """
    Each `SquadExample` object contains the character level offsets for each token
    in its input paragraph. We use them to get back the span of text corresponding
    to the tokens between our predicted start and end tokens.
    All the ground-truth answers are also present in each `SquadExample` object.
    We calculate the percentage of data points where the span of text obtained
    from model predictions matches one of the ground-truth answers.
    """

    def __init__(self, x_eval, y_eval):
        self.x_eval = x_eval
        self.y_eval = y_eval

    # Esta función se ejecuta al término de cada época
    def on_epoch_end(self, epoch, logs=None):
        # Obtiene las predicciones del modelo del conjunto de prueba
        pred_start, pred_end = self.model.predict(self.x_eval)
        count = 0
        eval_examples_no_skip = [_ for _ in test if _.skip == False]
        for idx, (start, end) in enumerate(zip(pred_start, pred_end)):
            squad_eg = eval_examples_no_skip[idx]
            offsets = squad_eg.context_token_to_char
            start = np.argmax(start)
            end = np.argmax(end)
            if start >= len(offsets):
                continue
            pred_char_start = offsets[start][0]
            if end < len(offsets):
                pred_char_end = offsets[end][1]
                pred_ans = squad_eg.context[pred_char_start:pred_char_end]
            else:
                pred_ans = squad_eg.context[pred_char_start:]

            normalized_pred_ans = normalize_text(pred_ans)
            normalized_true_ans = [normalize_text(_) for _ in squad_eg.all_answers]
            # Después de normalizar el texto, revisa si la predicción 
            # es un subconjunto de la respuesta real
            if normalized_pred_ans in normalized_true_ans:
                count += 1
        acc = count / len(self.y_eval[0])
        print(f"\nepoch={epoch+1}, exact match score={acc:.2f}")



##Entrenamiento & Evaluación

In [None]:
## Entrenamiento y evaluación del modelo 
exact_match_callback = ExactMatch(x_eval, y_eval)
model.fit(
    x_train,
    y_train,
    epochs=3,  # de acuerdo con el paper, se recomienda trabajar con 3 épocas
    verbose=2,
    batch_size=64,
    callbacks=[exact_match_callback],
)

In [None]:
## Imprime los resultados del modelo 
print(exact_match_callback)