In [3]:
import os
import re
import sys
import string
import glob

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from pathlib import Path
from collections import Counter


import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text


from pathlib import Path
from transformers import BertTokenizerFast, TFBertModel
from sklearn.model_selection import train_test_split as tts


In [None]:
text_dir = "../data/raw/text"
labels_dir =  "../data/raw/labels"
audio_dir = "../data/raw/audio"



## Nettoyage, traitement etc
Surtout utilisé pour prétraitement, mais une partie utile pour inférence de nouvelles données

In [None]:
text_files = glob.glob(f"{text_dir}/*.txt")

## Chaque ligne de texte est precede d'un "stamp" qui indique le clip de l'enregistrement etc
def remove_stamps_str(line)->str:
    #clip_num = re.search('.+___\d\d?\d?___.+', line).group(0)
    stamp = re.search('.+___', line).group(0)
    new_line = line.strip(stamp)
    return new_line

## Pour python < 3.9, sinon str.removeprefix() de base
def removeprefix(self: str, prefix: str, /) -> str:
    if self.startswith(prefix):
        return self[len(prefix):]
    else:
        return self[:]


In [None]:


def text_list_generator(files_list, text_dir):
    text_list = []
    for filename in files_list:
        with open(file = filename, encoding = 'utf-8') as f:

            ##WINDOWS SPECIFIC
            if sys.platform == 'win32':
                videoid = removeprefix(filename, text_dir + '\\').rstrip('.txt')
            else :
                videoid = removeprefix(filename, text_dir + '/').rstrip('.txt')
            lines = f.readlines()
            for line_number, text_line in enumerate(lines):
                clean_line = remove_stamps_str(text_line)
                clip_id = videoid +'_'+ text_line.split('___')[1]
                #clip_id = videoid +'_' +str(line_number)
                yield (clip_id, clean_line.rstrip())


In [None]:
#Retire tous les timestamps en début de ligne, présents dans chaque transcript
def remove_stamps_str(line)->str:
    stamp = re.search('.+___', line).group(0)
    new_line = line.strip(stamp)
    return new_line

#Retire les charactères non-ascii 
def remove_nonascii(line)->str:
    ascii_line = line.encode(encoding = 'ascii', errors = 'ignore').decode()
    return ascii_line

#met tout en minuscules, retire les nombres et stopwords
def clean_punct_digits(line)->str:
    new_line = line.translate(str.maketrans('', '', string.punctuation))
    new_line = ' '.join([word.lower() for word in new_line.split() if (len(word) >=2 and word.isalpha())])
    return new_line



### DF des labels

In [None]:
label_df = pd.read_csv("../data/interim/labels/interim.csv")
display(label_df)

### Données texte

À partir des fichiers .txt, génère un dataframe qui contient une entrée pour chaque ligne de texte

In [None]:
corpus = (text for text in text_list_generator(text_files))
df_text = pd.DataFrame(corpus)
df_text.columns = ['id', 'text']

In [None]:
display(df_text.head(5))

Comme le texte contient des charactères non-conformes (non ascii, autres langues etc), et de la mauvaise ponctuation,
on nettoie chaque entrée en retirant ponctuation, nombres et non-ascii.

In [None]:
df_text['text'] =  df_text.text.apply(remove_nonascii)
df_text['clean_text'] = df_text.text.apply(lambda s : clean_punct_digits(s))

display(df_text.head(3))

In [None]:
frame = pd.merge(df_text, label_df, on = 'id', how = 'inner')
display(frame.head(3))


Création d'un nouveau dataframe, "new polarity", pour n'utiliser que le texte nettoyé et la polarité du sentiment, de 0 pour négatif à 2 pour positif.

In [None]:
new_polarity_df = frame.copy()
new_polarity_df.drop(columns=['id', 'text','anger', 'disgust','fear','happiness','sadness', 'surprise'], inplace= True)
#Originalement, les valeurs sont de -1 à 1 mais modèle ne peut utiliser des valeurs négatives comme ça
new_polarity_df.sentiment = new_polarity_df.sentiment.apply(lambda x : x+1)
display(new_polarity_df.head(3))


In [4]:
new_polarity_df = pd.read_csv('./polarity.csv')
new_polarity_df.dropna(inplace=True)

In [5]:
new_polarity_df.clean_text

0        see that writer is somebody who has an incredi...
1        key polymer brings technical aspect to our ope...
2        were huge user of adhesives for our operation ...
3        key brings those types of aspects to business ...
4        we have many new opportunities through the way...
                               ...                        
15384    and once again of students are so well prepare...
15385    so its become more of an iterative process wit...
15386    secondly using social you know things like twi...
15387    john gerzema when think about finding insights...
15388    so think those two things together are incredi...
Name: clean_text, Length: 15385, dtype: object

In [None]:
display(new_polarity_df.sentiment.value_counts())



Les données ne sont pas très bien balancées, mais la performance est bonne quand même. Comme on veut que ce soit "réaliste", et qu'il s'agit du plus gros dataset, on assume que c'est une représentation adéquate.


### Modèle Tensorflow et BERT

In [20]:
module_url = 'https://tfhub.dev/tensorflow/small_bert/bert_en_uncased_L-2_H-128_A-2/2'
BERT_LAYER = hub.KerasLayer(module_url, trainable=True)
MAX_LEN = 256
seq_length = 128

text_input = tf.keras.layers.Input(shape=(), dtype=tf.string)
preprocessor = hub.KerasLayer(
    "https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/3")
encoder_inputs = preprocessor(text_input)
encoder = hub.KerasLayer(
    "https://tfhub.dev/tensorflow/small_bert/bert_en_uncased_L-2_H-128_A-2/2",
    trainable=True)
outputs = encoder(encoder_inputs)
pooled_output = outputs["pooled_output"]      # [batch_size, 128].
sequence_output = outputs["sequence_output"]  # [batch_size, seq_length, 128].

embedding_model = tf.keras.Model(text_input, pooled_output)
sentences = tf.constant(["(your text here)"])
print(embedding_model(sentences))

encoder_inputs = dict(
    input_word_ids=tf.keras.layers.Input(shape=(seq_length,), dtype=tf.int32),
    input_mask=tf.keras.layers.Input(shape=(seq_length,), dtype=tf.int32),
    input_type_ids=tf.keras.layers.Input(shape=(seq_length,), dtype=tf.int32),
)



tf.Tensor(
[[-0.9999971   0.11710754 -0.9996999  -0.30514616 -0.9802392  -0.862202
  -0.97073215 -0.695814    0.07994427 -0.13298516 -0.72146    -0.20102984
   0.06001551  0.9999996  -0.34816775 -0.9673598   0.24690115  0.13613726
  -0.86817634  0.9973174   0.88897586  0.03648714  0.99194896  0.78340924
  -0.99999815 -0.11478306 -0.9996667   0.61504483  0.9986014   0.0180592
  -0.10286351  0.03739744 -0.99590176 -0.9641891   0.75952166  0.99997634
  -0.97950923 -0.00245375  0.999005   -0.9982019   0.9824466   0.97204584
  -0.99923104  0.9809128  -0.9996157  -0.17666654 -0.99808425  0.9996697
   0.97371936  0.99670094  0.7636754  -0.7076082  -0.1536308  -0.27309185
   0.995362    0.98568845 -0.6754971  -0.7342357   0.9929015  -0.7974375
  -0.01379627  0.85045284 -0.9929437   0.98595965 -0.8722031  -0.9999986
  -0.15873732  0.97247326  0.9354089   0.98516846  0.9922101   0.17193754
  -0.99890375  0.04214636  0.5715292  -0.9942668  -0.8313425   0.13212052
  -0.36228302  0.07027563  0.3208

In [27]:
## Comme un NN avec BERT est extrêmement lourd (surtout pour HEROKU), très petit MAX_LEN pour réduire la taille des vecteurs et donc le nombre de paramètres
MAX_LEN = 20

#Le modele pre entraine BERT est utilise sous forme dun HubLayer 
#module_url = 'https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/2'
module_url = 'https://tfhub.dev/google/tn_bert/1'
#module_url = 'https://tfhub.dev/tensorflow/small_bert/bert_en_uncased_L-2_H-128_A-2/2'
BERT_LAYER = hub.KerasLayer(module_url, trainable=False)
#Tokenizer qui prend les strings propres et calcul les embeddings
TOKENIZER = BertTokenizerFast.from_pretrained('bert-base-uncased')


#Quelques metriques intéressantes
#Bien que F1_score serait interessant, KERAS  ne le fourni pas car ce nest pas une bonne metrique calculee par batch
METRICS = [
    tf.keras.metrics.CategoricalAccuracy(name = 'accuracy'),
    tf.keras.metrics.Precision(name = 'precision'),
    tf.keras.metrics.Recall(name = 'recall')
]

#Encodes lists of texts into BERT-useable tensors
def bert_encode(texts, tokenizer=TOKENIZER, max_len=MAX_LEN):
    all_tokens = []
    all_masks = []
    all_segments = []
    
    for text in texts:
        text = tokenizer.tokenize(text)
            
        text = text[:max_len-2]
        input_sequence = ["[CLS]"] + text + ["[SEP]"]
        pad_len = max_len - len(input_sequence)
        
        tokens = tokenizer.convert_tokens_to_ids(input_sequence) + [0] * pad_len
        pad_masks = [1] * len(input_sequence) + [0] * pad_len
        segment_ids = [0] * max_len
        
        all_tokens.append(tokens)
        all_masks.append(pad_masks)
        all_segments.append(segment_ids)
    
    return np.array(all_tokens), np.array(all_masks), np.array(all_segments)


#Building a neural-net that uses BERT embeddings
def build_model(bert_layer=BERT_LAYER, max_len=MAX_LEN):
    input_word_ids = tf.keras.Input(shape=(max_len,), dtype=tf.int32, name="input_word_ids")
    input_mask = tf.keras.Input(shape=(max_len,), dtype=tf.int32, name="input_mask")
    segment_ids = tf.keras.Input(shape=(max_len,), dtype=tf.int32, name="segment_ids")

    pooled_output, sequence_output = bert_layer([input_word_ids, input_mask, segment_ids])
    clf_output = sequence_output[:, 0, :]

    net = tf.keras.layers.Dense(16, activation='relu')(clf_output)
    net = tf.keras.layers.Dropout(0.9)(net)
    out = tf.keras.layers.Dense(3, activation='softmax')(net)
    
    model = tf.keras.models.Model(inputs=[input_word_ids, input_mask, segment_ids], outputs=out)
    model.compile(tf.keras.optimizers.Adam(learning_rate=3e-5, epsilon=1e-08), loss='categorical_crossentropy', metrics=METRICS)
    
    return model


def train_dnn(model, X_train, y_train, e=1):
    checkpoint = tf.keras.callbacks.ModelCheckpoint('smaller_model.h5', monitor='val_accuracy', save_best_only=True, verbose=1)
    earlystopping = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=5, verbose=1)

    train_input = bert_encode(X_train)
    train_labels = y_train

    train_history = model.fit(
            train_input, train_labels, 
            validation_split=0.2,
            epochs=e,
            callbacks=[checkpoint, earlystopping],
            batch_size=4,
            verbose=1
            )
    return train_history


    

#data = pd.read_csv('polarity_balanced.csv')
data = new_polarity_df
x = data.clean_text.values
dummy_sents = pd.get_dummies(data.sentiment)
y = dummy_sents.values
X_train, X_test, y_train, y_test = tts(x, y, test_size = 0.1)
model2 = build_model()

#histo = train_dnn(model2, X_train, y_train)


ValueError: Exception encountered when calling layer "keras_layer_25" (type KerasLayer).

in user code:

    File "c:\Users\trott\ml_env\lib\site-packages\tensorflow_hub\keras_layer.py", line 237, in call  *
        result = smart_cond.smart_cond(training,

    ValueError: Could not find matching concrete function to call loaded from the SavedModel. Got:
      Positional arguments (3 total):
        * [<tf.Tensor 'inputs:0' shape=(None, 20) dtype=int32>,
     <tf.Tensor 'inputs_1:0' shape=(None, 20) dtype=int32>,
     <tf.Tensor 'inputs_2:0' shape=(None, 20) dtype=int32>]
        * False
        * None
      Keyword arguments: {}
    
     Expected these arguments to match one of the following 4 option(s):
    
    Option 1:
      Positional arguments (3 total):
        * {'input_mask': TensorSpec(shape=(None, None), dtype=tf.int32, name='input_mask'),
     'input_type_ids': TensorSpec(shape=(None, None), dtype=tf.int32, name='input_type_ids'),
     'input_word_ids': TensorSpec(shape=(None, None), dtype=tf.int32, name='input_word_ids')}
        * False
        * None
      Keyword arguments: {}
    
    Option 2:
      Positional arguments (3 total):
        * {'input_mask': TensorSpec(shape=(None, None), dtype=tf.int32, name='inputs/input_mask'),
     'input_type_ids': TensorSpec(shape=(None, None), dtype=tf.int32, name='inputs/input_type_ids'),
     'input_word_ids': TensorSpec(shape=(None, None), dtype=tf.int32, name='inputs/input_word_ids')}
        * True
        * None
      Keyword arguments: {}
    
    Option 3:
      Positional arguments (3 total):
        * {'input_mask': TensorSpec(shape=(None, None), dtype=tf.int32, name='inputs/input_mask'),
     'input_type_ids': TensorSpec(shape=(None, None), dtype=tf.int32, name='inputs/input_type_ids'),
     'input_word_ids': TensorSpec(shape=(None, None), dtype=tf.int32, name='inputs/input_word_ids')}
        * False
        * None
      Keyword arguments: {}
    
    Option 4:
      Positional arguments (3 total):
        * {'input_mask': TensorSpec(shape=(None, None), dtype=tf.int32, name='input_mask'),
     'input_type_ids': TensorSpec(shape=(None, None), dtype=tf.int32, name='input_type_ids'),
     'input_word_ids': TensorSpec(shape=(None, None), dtype=tf.int32, name='input_word_ids')}
        * True
        * None
      Keyword arguments: {}


Call arguments received by layer "keras_layer_25" (type KerasLayer):
  • inputs=['tf.Tensor(shape=(None, 20), dtype=int32)', 'tf.Tensor(shape=(None, 20), dtype=int32)', 'tf.Tensor(shape=(None, 20), dtype=int32)']
  • training=False

In [26]:
model_json = model2.to_json()
with open("text_model.json", "w") as json_file:
    json_file.write(model_json)

model2.save_weights(f"weights.h5")

In [28]:
module_url = 'https://tfhub.dev/tensorflow/small_bert/bert_en_uncased_L-2_H-128_A-2/2'
BERT_LAYER = hub.KerasLayer(module_url, trainable=True)

def build_model(bert_layer=BERT_LAYER, max_len=MAX_LEN):
    text_input = tf.keras.layers.Input(shape=(), dtype=tf.string)
    preprocessor = hub.KerasLayer(
    "https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/3")
    encoder_inputs = preprocessor(text_input)
    encoder = hub.KerasLayer(
    "https://tfhub.dev/tensorflow/small_bert/bert_en_uncased_L-2_H-128_A-2/2",
    trainable=True)
    outputs = encoder(encoder_inputs)
    pooled_output = outputs["pooled_output"]      # [batch_size, 128].
    sequence_output = outputs["sequence_output"]  # [batch_size, seq_length, 128].
    embedding_model = tf.keras.Model(text_input, pooled_output)
        
    return embedding_model
mini_model = build_model()

In [None]:
import matplotlib.pyplot as plt 

In [None]:
# summarize history for accuracy
plt.plot(histo.history['accuracy'])
plt.plot(histo.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(histo.history['loss'])
plt.plot(histo.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()


plt.plot(histo.history['recall'])
plt.plot(histo.history['val_recall'])
plt.title('model recall')
plt.ylabel('recall')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix

In [None]:
encoded_X = bert_encode(X_test)
preds = loaded_model.predict(encoded_X)

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay

In [None]:
params = {"ytick.color" : "w",
          "xtick.color" : "w",
          "axes.labelcolor" : "w",
          "axes.edgecolor" : "w"}
plt.rcParams.update(params)

ConfusionMatrixDisplay.from_predictions(y_test_true, true_preds, display_labels = ['negative', 'neutral', 'positive'])
plt.title('Text model predictions', color='white')
plt.show()

In [None]:
y_test_true = pd.Series([np.argmax(y) for y in y_test])
true_preds = [np.argmax(x) for x in preds]

In [None]:
calc_metrics_per_class(y_test_true, true_preds)