In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import re
from nltk.corpus import stopwords
import json
import wandb
from wandb.integration.keras import WandbMetricsLogger, WandbEvalCallback, WandbModelCheckpoint
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.metrics import classification_report

In [None]:
Tokenizer = tf.keras.preprocessing.text.Tokenizer
pad_sequences = tf.keras.utils.pad_sequences
Sequential = tf.keras.models.Sequential
Dense = tf.keras.layers.Dense
Embedding = tf.keras.layers.Embedding
LSTM = tf.keras.layers.LSTM
SpatialDropout1D = tf.keras.layers.SpatialDropout1D
EarlyStopping = tf.keras.callbacks.EarlyStopping
to_categorical = tf.keras.utils.to_categorical
Dropout = tf.keras.layers.Dropout

In [None]:
config_data = json.load(open('../config.json'))
HF_TOKEN = config_data['HF_TOKEN']
WANDB_TOKEN = config_data['WANDB_TOKEN']

wandb.login(key=WANDB_TOKEN)

run = wandb.init(
    project='wz_experimental',
    config={"model_name": "LSTM"}
)

In [None]:
id_to_label_mapping = {0: 'A1', 1: 'A2', 2: 'B1', 3: 'B2', 4: 'C1', 5: 'C2'}
label_to_id_mapping = {'A1': 0, 'A2': 1, 'B1': 2, 'B2': 3, 'C1': 4, 'C2': 5}
cefr_levels = ["A1", "A2", "B1", "B2", "C1", "C2"]

REPLACE_BY_SPACE_RE = re.compile('[/(){}\[\]\|@,;]')
BAD_SYMBOLS_RE = re.compile('[^0-9a-z #+_]')
STOPWORDS = set(stopwords.words('english'))

In [None]:
config = wandb.config

config.maxlen = 1000
config.vocab_size = 30000
config.embedding_dims = 200
config.epochs = 7
config.hidden_dims = 200
config.batch_size = 32

In [None]:
train = pd.read_csv('../datasets/quotes/quotes_train.csv')
test = pd.read_csv('../datasets/quotes/quotes_test.csv')

print(train['level'].value_counts())
print(test['level'].value_counts())

train = train.sample(frac=1).reset_index(drop=True)
test = test.sample(frac=1).reset_index(drop=True)

In [None]:
def clean_text(text):
    text = text.lower()
    text = REPLACE_BY_SPACE_RE.sub(' ', text) 
    text = BAD_SYMBOLS_RE.sub('', text)  
    text = text.replace('x', '')
    
    text = ' '.join(word for word in text.split() if word not in STOPWORDS)
    return text

In [None]:
train['cleaned_text'] = train['text'].apply(clean_text)
test['cleaned_text'] = test['text'].apply(clean_text)

train['cleaned_text'] = train['cleaned_text'].str.replace('\d+', '')
test['cleaned_text'] = test['cleaned_text'].str.replace('\d+', '')

In [None]:
tokenizer = Tokenizer(num_words=config.vocab_size, filters='!"#$%&()*+,-./:;<=>?@[\]^_`{|}~', lower=True)
tokenizer.fit_on_texts( pd.concat([train['cleaned_text'], test['cleaned_text']], ignore_index=True).values)
word_index = tokenizer.word_index
print('Found %s unique tokens.' % len(word_index))

In [None]:
X_train_raw = train['cleaned_text']
Y_train_raw = train['label']
X_test_raw = test['cleaned_text']
Y_test_raw = test['label']

X_train = tokenizer.texts_to_sequences(X_train_raw.values)
X_train = pad_sequences(X_train, maxlen=config.maxlen)

Y_train = pd.get_dummies(Y_train_raw).values


X_test = tokenizer.texts_to_sequences(X_test_raw.values)
X_test = pad_sequences(X_test, maxlen=config.maxlen)

Y_test = pd.get_dummies(Y_test_raw).values

print(X_train.shape,Y_train.shape)
print(X_test.shape,Y_test.shape)

In [None]:
model = Sequential()
model.add(Embedding(config.vocab_size, config.embedding_dims))
model.add(SpatialDropout1D(0.2))
model.add(LSTM(config.hidden_dims, dropout=0.2, recurrent_dropout=0.2))
model.add(Dense(6, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
print(model.summary())

In [None]:
class WandbClfEvalCallback(WandbEvalCallback):
    def __init__(self, validation_data, data_table_columns, pred_table_columns):
        super().__init__(data_table_columns, pred_table_columns)

        self.x = validation_data[0]
        self.y = validation_data[1]

    def add_ground_truth(self, logs=None):
        for idx, (text, label) in enumerate(zip(self.x, self.y)):
            self.data_table.add_data(idx, text, label)

    def add_model_predictions(self, epoch, logs=None):
        preds = self.model.predict(self.x, verbose=0)
        preds = tf.argmax(preds, axis=-1)

        data_table_ref = self.data_table_ref
        table_idxs = data_table_ref.get_index()

        for idx in table_idxs:
            pred = preds[idx]
            self.pred_table.add_data(
                epoch,
                data_table_ref.data[idx][0],
                data_table_ref.data[idx][1],
                data_table_ref.data[idx][2],
                pred,
            )

history = model.fit(
    X_train,Y_train,
    epochs=config.epochs,
    batch_size=config.batch_size,
    validation_split=0.2,
    callbacks=[EarlyStopping(monitor='val_loss', patience=3, min_delta=0.0001), WandbMetricsLogger(),
               WandbClfEvalCallback(
                   validation_data=(X_train, Y_train),
                   data_table_columns=["idx", "text", "label"],
                   pred_table_columns=["epoch", "idx", "text", "label", "pred"],
               )]
)

In [None]:
accuracy = model.evaluate(X_test,Y_test)
print('Test set\n  Loss: {:0.3f}\n  Accuracy: {:0.3f}'.format(accuracy[0],accuracy[1]))

In [None]:
plt.title('Loss')
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.legend()
wandb.log({"Loss": plt})

In [None]:
plt.title('Accuracy')
plt.plot(history.history['accuracy'], label='train')
plt.plot(history.history['val_accuracy'], label='test')
plt.legend()
wandb.log({"Accuracy": plt})

In [None]:
import datetime 

def make_predictions(p_model, p_test):
    print(f'Started prediction at {datetime.datetime.now()}')
    for index, row in p_test.iterrows():
        sentence = row['cleaned_text']
        pred_sentence = tokenizer.texts_to_sequences([sentence])
        pred_sentence = pad_sequences(pred_sentence, maxlen=config.maxlen)
        pred = model.predict(pred_sentence)
        p_test.loc[index, 'predictions'] = cefr_levels[np.argmax(pred)]


make_predictions(model, test)
print(f'Ended prediction at {datetime.datetime.now()}')

In [None]:
from matplotlib import pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay

y_pred = test['predictions']

y_true = test['level']

def map_func(x):
    return label_to_id_mapping.get(x, -1)

y_true_mapped = np.vectorize(map_func)(y_true)
y_pred_mapped = np.vectorize(map_func)(y_pred)

# Calculate accuracy
accuracy = accuracy_score(y_true=y_true_mapped, y_pred=y_pred_mapped)
print(f'Accuracy: {accuracy:.3f}')

# Generate accuracy report
unique_labels = set(y_true_mapped)  # Get unique labels

for label in unique_labels:
    label_indices = [i for i in range(len(y_true_mapped)) if y_true_mapped[i] == label]
    label_y_true = [y_true_mapped[i] for i in label_indices]
    label_y_pred = [y_pred_mapped[i] for i in label_indices]
    label_accuracy = accuracy_score(label_y_true, label_y_pred)
    print(f'Accuracy for label {cefr_levels[label]}: {label_accuracy:.3f}')

class_report = classification_report(y_true=y_true_mapped, y_pred=y_pred_mapped, target_names=cefr_levels, labels=list(range(len(cefr_levels))))
class_report_dict = classification_report(y_true=y_true_mapped, y_pred=y_pred_mapped, target_names=cefr_levels, labels=list(range(len(cefr_levels))), output_dict=True)
print('\nClassification Report:')
print(class_report)
table_data = []

for key, value in class_report_dict.items():
    if isinstance(value, dict):
        table_data.append([
            key,
            value.get("precision", 0),
            value.get("recall", 0),
            value.get("f1-score", 0),
            value.get("support", 0)
        ])
    else:
        # For accuracy, add precision and recall as 0
        table_data.append([
            key,
            0,
            0,
            value,
            class_report_dict["weighted avg"]["support"]
        ])

# Generate confusion matrix
conf_matrix = confusion_matrix(y_true=y_true_mapped, y_pred=y_pred_mapped, labels=list(range(len(cefr_levels))))
disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix,
                              display_labels=cefr_levels)
disp.plot()


print('\nConfusion Matrix:')
print(conf_matrix)

wandb.log({
    "Class Proportions": wandb.sklearn.plot_class_proportions(train['level'], test['level'], cefr_levels),
    "Confusion Matrix": plt,
    "Classification Report": wandb.Table(data=table_data, columns=['Class/Metric', 'Precision', 'Recall', 'F1-score', 'Support'])
})

In [None]:
wandb.finish()

In [None]:
train[['text','label','level']].to_csv('../datasets/quotes/quotes_train.csv', index=False)
test[['text','label','level']].to_csv('../datasets/quotes/quotes_test.csv', index=False)