In [None]:
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
import tensorflow as tf
import numpy as np
from keras import layers
from keras import ops
import string
import re
import pandas as pd
import random

In [None]:
#   Cell 2: Model hyperparameters
vocab_size = 20000  # Only consider the top 20k words
maxlen = 200  # Only consider the first 200 words of each movie review

In [None]:
#   Cell 3: Transformer Block

@keras.saving.register_keras_serializable()
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()

        #   Multi-head attention layer
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        
        #   Feed-forward network layer: two dense layers with ReLU activation
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )

        #   Layer normalization layers
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)

        #   Dropout layers
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs):
        #   Self-attention
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output)

        #   Layer normalization
        out1 = self.layernorm1(inputs + attn_output)

        #   Feed-forward network
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)

        #   Layer normalization
        return self.layernorm2(out1 + ffn_output)

In [None]:
#   Cell 4: Token and Position Embedding
@keras.saving.register_keras_serializable()
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super().__init__()
        self.maxlen = maxlen
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        # maxlen = ops.shape(x)[-1]
        positions = ops.arange(start=0, stop=maxlen, step=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [None]:
test_set_size = 'full' #   Use the entire dataset

In [None]:
#   Cell 6: Functions to make predictions on manually provided text

def manual_predict(man_text, model):
    try: 
        vec_text = vectorize_layer(tf.constant([man_text]))
        return model.predict(vec_text, verbose=0)
    except:
        print(f'Prediction failed on {man_text}')
        return None

def manual_odds(man_text, model):
    result = manual_predict(man_text, model)
    if result is None:
        return None
    else:
        return result.tolist()[0][0]

# to be filled in with our appropriate labels
def manual_bin(man_text):
    if manual_odds(man_text) >= 0.5:
        return 'positive'
    else:
        return 'negative'

In [None]:
#   Cell 7: Model Testing Function
def test_model(test_set, model):
    df['predicted_odds'] = df['raw_text'].apply(lambda text: manual_odds(text, model))
    df['prediction'] = df['predicted_odds'].apply(lambda x: 'TA' if x >= 0.5 else 'NTA')
    df['is_correct'] = df['prediction'] == df['correct']
    return len(df[df['is_correct']]) / len(df)

In [None]:

def read_file(file):
    try:
        with open(file, 'r') as in_file:
            text = in_file.read()
        return text
    except:
        return None

In [None]:
#   Cell 9: Load Training Dataset
batch_size = 32
raw_train_ds = keras.utils.text_dataset_from_directory(
    "../data_formatted/unbalanced/train",
    batch_size=batch_size,
    validation_split=0.2,
    subset="training",
    seed=1337,
)

In [None]:
#   Cell 10: Custom Standardization Function (same as training)
def custom_standardization(input_data):
    lowercase = tf.strings.lower(input_data)
    stripped_html = tf.strings.regex_replace(lowercase, "<br />", " ")
    return tf.strings.regex_replace(
        stripped_html, f"[{re.escape(string.punctuation)}]", ""
    )

max_features = 2000
embedding_dim = 64
sequence_length = 250

vectorize_layer = keras.layers.TextVectorization(
    standardize=custom_standardization,
    max_tokens=max_features,
    output_mode="int",
    output_sequence_length=sequence_length,
)

text_ds = raw_train_ds.map(lambda x, y: x) #   Only extract text, not labels

In [None]:
vectorize_layer.adapt(text_ds)  #   Build the vocabulary

In [None]:
pos_files = [f'../data_formatted/balanced/test/pos/{file}' for file in os.listdir('../data_formatted/balanced/test/pos')]
neg_files = [f'../data_formatted/balanced/test/neg/{file}' for file in os.listdir('../data_formatted/balanced/test/neg')]

In [None]:
neg_df = pd.DataFrame({
    'file': neg_files,
    'correct': 'NTA'
})

pos_df = pd.DataFrame({
    'file': pos_files,
    'correct': 'TA'
})

df = pd.concat([neg_df, pos_df]).reset_index(drop=True)

In [None]:
df['raw_text'] = [read_file(file) for file in df['file']]

In [None]:
neural_models = [f'../model/{model}' for model in os.listdir('../model') if not 'transformer' in model]

model_stats = pd.DataFrame({
    'model': neural_models,
    'accuracy_rate': [0] * len(neural_models)
})

In [None]:
#   Cell 16: Evaluate all models under the model directory
for i in range(0, len(model_stats)):
    curr_model = model_stats['model'][i]

    model = keras.models.load_model(curr_model)
    accuracy_rate = test_model(df, model)
    print(f'Model {curr_model} has accuracy {accuracy_rate}')
    model_stats.loc[i, 'accuracy_rate'] = accuracy_rate

In [None]:
model_stats.sort_values('accuracy_rate', ascending=False)

In [None]:
model_stats.to_csv(f'../model-stats_test-set-{test_set_size}.csv', index=False)