In [37]:
import tensorflow as tf
import tensorflow_addons as tfa

import numpy as np
import pandas as pd

from keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Input, LSTM, Embedding, TimeDistributed, Bidirectional, Dense, Layer, InputSpec
from tensorflow_addons.text import crf_log_likelihood, crf_decode

import tensorflow.keras.backend as K
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping


# Data preprocessing functions

In [38]:
def to_tuples(data):
    iterator = zip(data["token"].values.tolist(),
                  data["iob_label"].values.tolist())
    return [(token, iob_label) for token, iob_label in iterator]

In [39]:
def build_vocab(data):
  all_words = list(set(data["token"].values))
  all_tags = list(set(data["iob_label"].values))

  word2index = {word: idx + 2 for idx, word in enumerate(all_words)}

  word2index["--UNKNOWN_WORD--"] = 0

  word2index["--PADDING--"] = 1

  index2word = {idx: word for word, idx in word2index.items()}

  tag2index = {tag: idx + 1 for idx, tag in enumerate(all_tags)}
  tag2index["--PADDING--"] = 0

  index2tag = {idx: word for word, idx in tag2index.items()}

  return word2index, index2word, tag2index, index2tag

In [40]:
def tokenize(reports, word2index, tag2index, max_sentence_size=512):
  contents = []
  labels = []
  for report in reports:
    content = []
    label = []
    for i in range(len(report)):
      token, iob_tag = report[i]
      word_idx = word2index.get(token, 0)
      tag_idx = tag2index.get(iob_tag, 0)
      content.append(word_idx)
      label.append(tag_idx)

    contents.append(content)
    labels.append(label)

  contents = tf.keras.preprocessing.sequence.pad_sequences(contents, maxlen=max_sentence_size, padding='post', value=1)
  labels = tf.keras.preprocessing.sequence.pad_sequences(labels, maxlen=max_sentence_size, padding='post')

  tag_size = len(tag2index)


  labels_categorical = [tf.keras.utils.to_categorical(i, num_classes=tag_size) for i in labels]
  labels_categorical = np.asarray(labels_categorical)

  return contents, labels, labels_categorical

# Model

In [41]:
class CRF(Layer):
    def __init__(self,
                 output_dim,
                 sparse_target=True,
                 transitions=None,
                 **kwargs):
        """
        Args:
            output_dim (int): the number of labels to tag each temporal input.
            sparse_target (bool): whether the the ground-truth label represented in one-hot.
        Input shape:
            (batch_size, sentence length, output_dim)
        Output shape:
            (batch_size, sentence length, output_dim)
        """
        super(CRF, self).__init__(**kwargs)
        self.output_dim = int(output_dim)
        self.sparse_target = sparse_target
        self.input_spec = InputSpec(min_ndim=3)
        self.supports_masking = False
        self.sequence_lengths = None
        self.transitions = transitions

    def build(self, input_shape):
        assert len(input_shape) == 3
        f_shape = tf.TensorShape(input_shape)
        input_spec = InputSpec(min_ndim=3, axes={-1: f_shape[-1]})

        if f_shape[-1] is None:
            raise ValueError('The last dimension of the inputs to `CRF` '
                             'should be defined. Found `None`.')
        if f_shape[-1] != self.output_dim:
            raise ValueError('The last dimension of the input shape must be equal to output'
                             ' shape. Use a linear layer if needed.')
        self.input_spec = input_spec
        self.transitions = self.add_weight(name='transitions',
                                           shape=[self.output_dim, self.output_dim],
                                           initializer='glorot_uniform',
                                           trainable=True)
        self.built = True

    def compute_mask(self, inputs, mask=None):
        # Just pass the received mask from previous layer, to the next layer or
        # manipulate it if this layer changes the shape of the input
        return mask

    def call(self, inputs, sequence_lengths=None, training=None, **kwargs):
        sequences = tf.convert_to_tensor(inputs, dtype=self.dtype)
        if sequence_lengths is not None:
            assert len(sequence_lengths.shape) == 2
            assert tf.convert_to_tensor(sequence_lengths).dtype == 'int32'
            seq_len_shape = tf.convert_to_tensor(sequence_lengths).get_shape().as_list()
            assert seq_len_shape[1] == 1
            self.sequence_lengths = K.flatten(sequence_lengths)
        else:
            self.sequence_lengths = tf.ones(tf.shape(inputs)[0], dtype=tf.int32) * (
                tf.shape(inputs)[1]
            )

        viterbi_sequence, _ = crf_decode(sequences,
                                         self.transitions,
                                         self.sequence_lengths)
        output = K.one_hot(viterbi_sequence, self.output_dim)
        return K.in_train_phase(sequences, output)

    @property
    def loss(self):
        def crf_loss(y_true, y_pred):
            y_pred = tf.convert_to_tensor(y_pred, dtype=self.dtype)
            log_likelihood, self.transitions = crf_log_likelihood(
                y_pred,
                tf.cast(K.argmax(y_true), dtype=tf.int32) if self.sparse_target else y_true,
                self.sequence_lengths,
                transition_params=self.transitions,
            )
            return tf.reduce_mean(-log_likelihood)
        return crf_loss

    @property
    def accuracy(self):
        def viterbi_accuracy(y_true, y_pred):
            # -1e10 to avoid zero at sum(mask)
            mask = K.cast(
                K.all(K.greater(y_pred, -1e10), axis=2), K.floatx())
            shape = tf.shape(y_pred)
            sequence_lengths = tf.ones(shape[0], dtype=tf.int32) * (shape[1])
            y_pred, _ = crf_decode(y_pred, self.transitions, sequence_lengths)
            if self.sparse_target:
                y_true = K.argmax(y_true, 2)
            y_pred = K.cast(y_pred, 'int32')
            y_true = K.cast(y_true, 'int32')
            corrects = K.cast(K.equal(y_true, y_pred), K.floatx())
            return K.sum(corrects * mask) / K.sum(mask)
        return viterbi_accuracy

    def compute_output_shape(self, input_shape):
        tf.TensorShape(input_shape).assert_has_rank(3)
        return input_shape[:2] + (self.output_dim,)


    def get_config(self):
        config = super(CRF, self).get_config()
        config.update({
            'output_dim': self.output_dim,
            'sparse_target': self.sparse_target,
            'transitions': self.transitions.numpy()  # Convert the transitions to a NumPy array
        })
        return config
    
    @classmethod
    def from_config(cls, config):
        # Since 'transitions' is a NumPy array, we need to convert it back to a tensor
        transitions = tf.convert_to_tensor(config['transitions'])
        # Create a new instance of CRF with the saved configuration
        return cls(output_dim=config['output_dim'], sparse_target=config['sparse_target'], transitions=transitions)

In [42]:
def embedding_layer(input_dim, output_dim, input_length, mask_zero):
    return Embedding(input_dim = input_dim, output_dim = output_dim, input_length = input_length, mask_zero = mask_zero)

def bilstm_crf(maxlen, n_tags, lstm_units, embedding_dim, n_words, mask_zero, training = True):
    """
    bilstm_crf - module to build BiLSTM-CRF model
    Inputs:
        - input_shape : tuple
            Tensor shape of inputs, excluding batch size
    Outputs:
        - output : tensorflow.keras.outputs.output
            BiLSTM-CRF output
    """
    input = Input(shape = (maxlen,))
    # Embedding layer
    embeddings = embedding_layer(input_dim = n_words, output_dim = embedding_dim, input_length = maxlen, mask_zero = mask_zero)
    output = embeddings(input)

    # BiLSTM layer
    output = Bidirectional(LSTM(units = lstm_units, return_sequences = True, recurrent_dropout = 0.1))(output)

    # Dense layer
    output = TimeDistributed(Dense(n_tags, activation = 'relu'))(output)

    output = CRF(n_tags, name = 'crf_layer')(output)
    return Model(input, output)

# Utils

In [43]:
def number_to_word_test_sentences_and_tags(index2tag, index2word, X_test, y_test):

    test_sentences= []
    test_tags = []

    # Recupera os laudos e tags no formato word2index/tag2index
    for i in range(len(X_test)):
        aux_tag = []

        report = ""
        sentence = X_test[i]
        tags = y_test[i]

        # Recupera o laudo
        for j in range(len(sentence)):
            # Recupera a palavra
            word = sentence[j]
            # Recupera a tag
            tag = tags[j]
            int_tag = np.where(tag == int(1))
            # Constrói o laudo ignorando as palavras "padding"
            # Constrói o array de tags do laudo
            if str(index2word[word]) != '--PADDING--':
                report = report + " " + str(index2word[word])
                aux_tag.append(index2tag[int(int_tag[0][0])])

        test_sentences.append(report)
        test_tags.append(aux_tag)

    return test_sentences, test_tags

# Train

In [44]:
def train(hyperparams, n_words, n_tags, text_sequences, tag_sequences_categorical):
    # Criar o modelo
    model = bilstm_crf(
        maxlen=hyperparams['max_len'], 
        n_tags=n_tags, 
        lstm_units=hyperparams['lstm_units'], 
        embedding_dim=hyperparams['embedding_dim'], 
        n_words=n_words, 
        mask_zero=True
    )

    model.summary()

    # Compilar o modelo
    model.compile(
        optimizer=Adam(learning_rate=hyperparams['learning_rate']), 
        loss=model.layers[-1].loss, 
        metrics=model.layers[-1].accuracy
    )

    # Definir callbacks
    callbacks = [
        ReduceLROnPlateau(monitor='loss', factor=0.1, patience=5, verbose=1),
        EarlyStopping(monitor='loss', min_delta=0, patience=5, verbose=1)
    ]

    # Treinar o modelo
    model.fit(
        text_sequences, 
        tag_sequences_categorical, 
        epochs=hyperparams['max_epochs'], 
        callbacks=callbacks, 
        verbose=1, 
        shuffle=True
    )

    return model

# RUN

In [45]:
#data_train = pd.read_csv('../data/df_train_llms_tokens_labeled_iob.csv', encoding= 'utf-8', index_col=0)
data_train = pd.read_csv('../data/df_tokens_labeled_iob.csv', encoding= 'utf-8', index_col=0)
data_test = pd.read_csv('../data/df_test_llms_tokens_labeled_iob.csv', encoding= 'utf-8', index_col=0)

In [46]:
word2index, index2word, tag2index, index2tag = build_vocab(data_train)
reports = data_train.groupby("report").apply(to_tuples).tolist()
text_sequences, tag_sequences, tag_sequences_categorical = tokenize(reports, word2index, tag2index)

In [47]:
X_train = text_sequences
y_train = tag_sequences_categorical
train_sentences, train_tags = number_to_word_test_sentences_and_tags(index2tag, index2word, text_sequences, tag_sequences_categorical)

In [48]:
# Definir hiperparâmetros
hyperparams = {
    'max_len': 512,
    'lstm_dropout': 0.1,
    'max_epochs': 10,
    'learning_rate': 0.01,
    'embedding_dim': 300,
    'lstm_units': 50,
    'batch_size': 8
}

# Configurações do modelo
n_words = len(word2index)
n_tags = len(tag2index)

In [49]:
model = train(hyperparams, n_words, n_tags, text_sequences, tag_sequences_categorical)

Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 512)]             0         
                                                                 
 embedding_1 (Embedding)     (None, 512, 300)          1078500   
                                                                 
 bidirectional_1 (Bidirectio  (None, 512, 100)         140400    
 nal)                                                            
                                                                 
 time_distributed_1 (TimeDis  (None, 512, 14)          1414      
 tributed)                                                       
                                                                 
 crf_layer (CRF)             (None, 512, 14)           196       
                                                                 
Total params: 1,220,510
Trainable params: 1,220,510
Non-tra

# Evaluate

In [50]:
def result_df_model_previous(test_sentences, test_tags, model, word2index, index2tag, MAX_SENTENCE=512):

    test_df = pd.DataFrame(columns = ['report', 'word', 'tag', 'tag_pred'])

    for i in range (len(test_sentences)):

        # Gera os laudos no formato index2word com o tamanho max_sentence
        sentence = test_sentences[i]
        tags = test_tags[i]
        
        sentence = sentence.split()
        padded_sentence = sentence + [word2index["--PADDING--"]] * (MAX_SENTENCE - len(sentence))
        padded_sentence = [word2index.get(w, 0) for w in padded_sentence]

        # Faz a predição das tags das palavras
        pred = model.predict(np.array([padded_sentence]))
        pred = np.argmax(pred, axis=-1)

        if i < 10:
            retval = ""
            for w, t, p in zip(sentence, tags, pred[0]):
                retval = retval + "{:25}: {:10} {:5}".format(w, t, index2tag[p]) + "\n"
                aux_dict = {'report': ('report_0' + str(i)), 'word': w, 'tag' : t, 'tag_pred' : index2tag[p]}
                df_new_row = pd.DataFrame([aux_dict])
                test_df = pd.concat([test_df, df_new_row])


        else:
            retval = ""
            for w, t, p in zip(sentence, tags, pred[0]):
                retval = retval + "{:25}: {:10} {:5}".format(w, t, index2tag[p]) + "\n"
                aux_dict = {'report': ('report_' + str(i)), 'word': w, 'tag' : t, 'tag_pred' : index2tag[p]}
                df_new_row = pd.DataFrame([aux_dict])
                test_df = pd.concat([test_df, df_new_row])

    return test_df

In [51]:
test_reports = data_test.groupby("report").apply(to_tuples).tolist()
test_text_sequences, test_tag_sequences, test_tag_sequences_categorical = tokenize(test_reports, word2index, tag2index)
test_sentences, test_tags = number_to_word_test_sentences_and_tags(index2tag, index2word, test_text_sequences, test_tag_sequences_categorical)
result_df_model_00 = result_df_model_previous(test_sentences, test_tags, model, word2index, index2tag)
result_df_model_00.to_csv("result_df_model.csv", encoding='utf-8', index=False)

