In [None]:
import os

# takes care of annoying TF-GPU warnings
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

In [None]:
# very useful for managing wandb runs: https://stackoverflow.com/questions/71106179/log-two-model-runs-with-keras-wandb
import wandb
from wandb.keras import WandbCallback
os.environ["WANDB_SILENT"] = "true"

#### RNN: Formula Label Prediction 

In [None]:
import numpy as np 
import pandas as pd
from pathlib import Path 

import tensorflow as tf

import tensorflow_datasets as tfds
import tensorflow_text as tf_text

import datetime

tfds.disable_progress_bar()
wandb_project_name = "binary_formula_classification"

In [None]:
import matplotlib.pyplot as plt

def plot_graphs(history, metric):
    plt.plot(history.history[metric])
    plt.plot(history.history["val_"+metric], "")
    plt.xlabel("Epochs")
    plt.ylabel(metric)
    plt.legend([metric, "val_"+metric])

#### Import Data and Preprocess Dataframe

In [None]:
def preprocess_data(corpus,
                    irrelevant_features=["mtype",]):
    # drop irrelevant columns
    corpus.drop(irrelevant_features, inplace=True, axis=1)

    # filter strings
    def process_cell(cell_str):
        stripped_f_str = cell_str[1:-1].replace("\\\\", "\\")
        f_list = stripped_f_str.split(",")
        f_list = [token.replace("'", "").replace(" ", "") for token in f_list]
        f_list = ["{" if token == "\\{" else token for token in f_list]
        f_list = ["}" if token == "\\}" else token for token in f_list]
        cell_str = " ".join(f_list)
        return cell_str

    corpus["tokens"] = corpus["tokens"].map(process_cell)

In [None]:
#print(os.getcwd())
data_p = Path("../data/") / "bin_class_data_TOKENIZED_SET_V1.csv"
data = pd.read_csv(data_p)
preprocess_data(data)
data.head()


In [None]:
SIZE_TRAIN_DS = 10000
SIZE_TEST_DS = 1000
data_as_ds = tf.data.Dataset.from_tensor_slices((data["tokens"], data["label"])) 
test_dataset = data_as_ds.take(SIZE_TEST_DS)
train_dataset = data_as_ds.skip(SIZE_TEST_DS)

##### Setup and Data Preparation

In [None]:
for example, label in train_dataset.take(3):
    print("text: ", example.numpy())
    print("label: ", label.numpy())

In [None]:
BUFFER_SIZE = SIZE_TRAIN_DS
BATCH_SIZE = 64
STEPS_PER_EPOCH = np.floor(SIZE_TRAIN_DS/BATCH_SIZE)
VAL_STEPS_PER_EPOCH = np.floor(SIZE_TEST_DS/BATCH_SIZE)
train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True).prefetch(tf.data.AUTOTUNE)
test_dataset = test_dataset.batch(BATCH_SIZE, drop_remainder=True).prefetch(tf.data.AUTOTUNE)

##### Text Encoding

In [None]:
def create_encoder(output_mode_str, n_grams):
       
    VOCAB_SIZE = 1000
    encoder = tf.keras.layers.TextVectorization(
        standardize=None,
        output_mode=output_mode_str,
        ngrams = n_grams,
        split="whitespace",
        max_tokens=VOCAB_SIZE)
    encoder.adapt(train_dataset.map(lambda tokens, label: tokens)) # removes the label column through transformation: text, label -> text
    return encoder

##### Representation 1: Use integer indices encoding

In [None]:
encoder_int = create_encoder("int", None)
vocab = np.array(encoder_int.get_vocabulary())
vocab[:200]

In [None]:
encoded_example = encoder_int(example).numpy()
print(example)
print(encoded_example)
print(encoded_example.shape)
print(label.shape)

In [None]:
print("Original: ", example.numpy())
print("After reverse lookup: ", " ".join(vocab[encoded_example]))

In [None]:
#inputter = tf.keras.layers.Input(shape=(len(encoder_int.get_vocabulary()),) ,batch_size = 1)
#pooled_val = tf.reshape(embedded_val, [val1*val2, 1])
#encoded_example = tf.reshape(encoded_example, [1,len(encoder_int.get_vocabulary())-1])
#print(encoded_example)

In [None]:
"""
embedder = tf.keras.layers.Embedding(
            input_dim=len(encoder_int.get_vocabulary()),
            output_dim=64,
            # user masking to handle the variable sequence lengths
            mask_zero=True)
embedded_val = embedder(encoded_example)
print(embedded_val)
embedded_val = tf.reshape(embedded_val, [1,122,64])
#print(embedded_val.shape)
"""

In [None]:
"""
flatter = tf.keras.layers.Flatten(input_shape=(122, 64))
flattened = flatter(embedded_val)
print(flattened.shape)
"""

In [None]:
#val1, val2 = embedded_val.shape
#pooled_val = tf.reshape(embedded_val, [val1*val2, 1])
#print(pooled_val)
#pooled_val = tf.reshape(embedded_val, [122, ])

In [None]:
#denser = tf.keras.layers.Dense(1, activation="sigmoid")
#denser_val = denser(pooled_val)
#print(denser_val)

##### Representation 2: Binary Count Vectorizer

In [None]:
encoder_multi_hot = create_encoder("multi_hot", None)
vocab = np.array(encoder_multi_hot.get_vocabulary())
vocab[:200]

In [None]:
encoded_example = encoder_multi_hot(example).numpy()
print(example)
print(encoded_example)
print(encoded_example.shape)

##### Representation 3: Frequency Count Vectorizer

In [None]:
encoder_count = create_encoder("count", None)
vocab = np.array(encoder_count.get_vocabulary())
vocab[:200]

In [None]:
encoded_example = encoder_count(example).numpy()
print(example)
print(encoded_example)
print(encoded_example.shape)

##### Representation 4: Bigrams Frequency Count Vectorizer

In [None]:
encoder_bigrams = create_encoder("count", (2))
vocab = np.array(encoder_bigrams.get_vocabulary())
vocab[:200]

In [None]:
encoded_example = encoder_bigrams(example).numpy()
print(example)
print(encoded_example)
print(encoded_example.shape)

##### Representation 5: Tf-Idf Vectorizer

In [None]:
encoder_tf_idf = create_encoder("tf_idf", None)
vocab = np.array(encoder_tf_idf.get_vocabulary())
vocab[:200]

In [None]:
encoded_example = encoder_tf_idf(example).numpy()
print(example)
print(encoded_example)
print(encoded_example.shape)

##### Model (Logistic Regression)

 **Define and compile model**

In [None]:
"""
model = tf.keras.Sequential()
model.add(tf.keras.Input(shape=(1,), dtype=tf.string))
model.add(encoder_int)
embedding = tf.keras.layers.Embedding(
                input_dim=len(encoder_int.get_vocabulary()),
                output_dim=64,
                # user masking to handle the variable sequence lengths
                mask_zero=True)
model.add(embedding)
model.output_shape
"""

In [None]:
def create_model(vec_encoder):
    print(len(vec_encoder.get_vocabulary()))
    model = tf.keras.Sequential([
        vec_encoder,
        tf.keras.layers.Normalization(),
        tf.keras.layers.Embedding(
            input_dim=len(vec_encoder.get_vocabulary()),
            output_dim=16,
            # user masking to handle the variable sequence lengths
            mask_zero=True),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.GlobalAveragePooling1D(),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(1)
    ])
    return model

 **Train the model**

##### Experiment 1: Use integer indices for encoding tokens

In [None]:
### TENSORBOARD USAGE
#log_dir = "4_nlp_rnns/logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
#tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir)

In [None]:
model = create_model(encoder_int)
# predict on a sample formula using untrained model
sample_text = ("g ( x ) = x ^ 3")
predictions = model.predict(np.array([sample_text]))
print(predictions[0])

In [None]:
model.summary()

In [None]:
#model.compile(optimizer='sgd',loss='sparse_categorical_crossentropy',metrics=['accuracy'])
model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
              optimizer="adam",
              metrics=tf.metrics.BinaryAccuracy(threshold=0.0))

In [None]:
configs_dict = {
    "learning_rate": 1e-4,
    "algorithm": "LogReg",
    "configuration": "1-tokens",
    "optimizer": "sgd",
    "loss": "sparse_categorical_crossentropy",
    "epochs": 15,
    "batch_size": 64,
    "vectorizer": "int",
    "dataset": "TOKENIZED_SET_V1"
}
run = wandb.init(project=wandb_project_name, reinit=True, config=configs_dict)

history = model.fit(train_dataset, epochs=15,
                    validation_data=test_dataset,
                    validation_steps=VAL_STEPS_PER_EPOCH, steps_per_epoch=STEPS_PER_EPOCH,callbacks= [WandbCallback()])#[tensorboard_callback])
run.finish()

In [None]:
test_loss, test_acc = model.evaluate(test_dataset)

print("Test Loss:", test_loss)
print("Test Accuracy:", test_acc)

In [None]:
plt.figure(figsize=(16,8))
plt.subplot(1,2,1)
plot_graphs(history, "accuracy")
plt.ylim(None, 1)
plt.subplot(1,2,2)
plot_graphs(history, "loss")
plt.ylim(0, None)

In [None]:
# predict on a sample text without padding
sample_text = ("g ( x ) = x ^ 3")
predictions = model.predict(np.array([sample_text]))
print(predictions[0])

##### Experiment 2: Use binary count vectorizer for encoding

In [None]:
model = create_model(encoder_multi_hot)
# predict on a sample formula using untrained model
sample_text = ("g ( x ) = x ^ 3")
predictions = model.predict(np.array([sample_text]))
print(predictions[0])

In [None]:
model.compile(optimizer='sgd',loss='sparse_categorical_crossentropy',metrics=['accuracy'])

In [None]:
configs_dict = {
    "learning_rate": 1e-4,
    "algorithm": "LogReg",
    "configuration": "1-tokens",
    "optimizer": "sgd",
    "loss": "sparse_categorical_crossentropy",
    "epochs": 15,
    "batch_size": 64,
    "vectorizer": "multi_hot",
    "dataset": "TOKENIZED_SET_V1"
}
run = wandb.init(project=wandb_project_name, reinit=True, config=configs_dict)


history = model.fit(train_dataset, epochs=15,
                    validation_data=test_dataset,
                    validation_steps=30,callbacks= [WandbCallback()])#[tensorboard_callback])
run.finish()

In [None]:
test_loss, test_acc = model.evaluate(test_dataset)

print("Test Loss:", test_loss)
print("Test Accuracy:", test_acc)

In [None]:
plt.figure(figsize=(16,8))
plt.subplot(1,2,1)
plot_graphs(history, "accuracy")
plt.ylim(None, 1)
plt.subplot(1,2,2)
plot_graphs(history, "loss")
plt.ylim(0, None)

In [None]:
# predict on a sample text without padding
sample_text = ("g ( x ) = x ^ 3")
predictions = model.predict(np.array([sample_text]))
print(predictions[0])

##### Experiment 3:

In [None]:
model = create_model(encoder_count)
# predict on a sample formula using untrained model
sample_text = ("g ( x ) = x ^ 3")
predictions = model.predict(np.array([sample_text]))
print(predictions[0])

In [None]:
model.compile(optimizer='sgd',loss='sparse_categorical_crossentropy',metrics=['accuracy'])

In [None]:
configs_dict = {
    "learning_rate": 1e-4,
    "algorithm": "LogReg",
    "configuration": "1-tokens",
    "optimizer": "sgd",
    "loss": "sparse_categorical_crossentropy",
    "epochs": 15,
    "batch_size": 64,
    "vectorizer": "count",
    "dataset": "TOKENIZED_SET_V1"
}
run = wandb.init(project=wandb_project_name, reinit=True, config=configs_dict)


history = model.fit(train_dataset, epochs=15,
                    validation_data=test_dataset,
                    validation_steps=30,callbacks= [WandbCallback()])#[tensorboard_callback])
run.finish()

In [None]:
test_loss, test_acc = model.evaluate(test_dataset)

print("Test Loss:", test_loss)
print("Test Accuracy:", test_acc)

In [None]:
plt.figure(figsize=(16,8))
plt.subplot(1,2,1)
plot_graphs(history, "accuracy")
plt.ylim(None, 1)
plt.subplot(1,2,2)
plot_graphs(history, "loss")
plt.ylim(0, None)

In [None]:
# predict on a sample text without padding
sample_text = ("g ( x ) = x ^ 3")
predictions = model.predict(np.array([sample_text]))
print(predictions[0])

##### Experiment 4:

In [None]:
model = create_model(encoder_bigrams)
# predict on a sample formula using untrained model
sample_text = ("g ( x ) = x ^ 3")
predictions = model.predict(np.array([sample_text]))
print(predictions[0])

In [None]:
model.compile(optimizer='sgd',loss='sparse_categorical_crossentropy',metrics=['accuracy'])

In [None]:
configs_dict = {
    "learning_rate": 1e-4,
    "algorithm": "LogReg",
    "configuration": "1-tokens",
    "optimizer": "sgd",
    "loss": "sparse_categorical_crossentropy",
    "epochs": 15,
    "batch_size": 64,
    "vectorizer": "bigram_count",
    "dataset": "TOKENIZED_SET_V1"
}
run = wandb.init(project=wandb_project_name, reinit=True, config=configs_dict)


history = model.fit(train_dataset, epochs=15,
                    validation_data=test_dataset,
                    validation_steps=30,callbacks= [WandbCallback()])#[tensorboard_callback])
run.finish()

In [None]:
test_loss, test_acc = model.evaluate(test_dataset)

print("Test Loss:", test_loss)
print("Test Accuracy:", test_acc)

In [None]:
plt.figure(figsize=(16,8))
plt.subplot(1,2,1)
plot_graphs(history, "accuracy")
plt.ylim(None, 1)
plt.subplot(1,2,2)
plot_graphs(history, "loss")
plt.ylim(0, None)

In [None]:
# predict on a sample text without padding
sample_text = ("g ( x ) = x ^ 3")
predictions = model.predict(np.array([sample_text]))
print(predictions[0])

##### Experiment 5:

In [None]:
model = create_model(encoder_tf_idf)
# predict on a sample formula using untrained model
sample_text = ("g ( x ) = x ^ 3")
predictions = model.predict(np.array([sample_text]))
print(predictions[0])

In [None]:
model.compile(optimizer='sgd',loss='sparse_categorical_crossentropy',metrics=['accuracy'])

In [None]:
configs_dict = {
    "learning_rate": 1e-4,
    "algorithm": "LogReg",
    "configuration": "1-tokens",
    "optimizer": "sgd",
    "loss": "sparse_categorical_crossentropy",
    "epochs": 15,
    "batch_size": 64,
    "vectorizer": "tf_idf",
    "dataset": "TOKENIZED_SET_V1"
}
run = wandb.init(project=wandb_project_name, reinit=True, config=configs_dict)

history = model.fit(train_dataset, epochs=15,
                    validation_data=test_dataset,
                    validation_steps=30,callbacks= [WandbCallback()])#[tensorboard_callback])
run.finish()

In [None]:
test_loss, test_acc = model.evaluate(test_dataset)

print("Test Loss:", test_loss)
print("Test Accuracy:", test_acc)

In [None]:
plt.figure(figsize=(16,8))
plt.subplot(1,2,1)
plot_graphs(history, "accuracy")
plt.ylim(None, 1)
plt.subplot(1,2,2)
plot_graphs(history, "loss")
plt.ylim(0, None)

In [None]:
# predict on a sample text without padding
sample_text = ("g ( x ) = x ^ 3")
predictions = model.predict(np.array([sample_text]))
print(predictions[0])