# MathQA Constant Classifier

#### Imports

In [18]:
from enum import Enum
import os
import pandas as pd
import math
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModel, TrainingArguments, Trainer, AutoModelForSequenceClassification
from sklearn.metrics import f1_score, accuracy_score
from datasets import Dataset
import warnings
import pickle

#### Constants

In [19]:
TRAINING_MODE = True # Set this to false to switch this into load and evaluate mode
DATA_PATH = './dataset/'
SET_NAMES = ['train', 'validation', 'test']
MODEL_TYPE = 'microsoft/deberta-v3-large' # A more optimized version of roberta obtaining 95% of its performance
MODEL_PATH = f'{MODEL_TYPE.split("/")[-1]}-op_classifier-mathqa'
MAX_LENGTH = 392
NUM_MASK = '<num>'
WORKING_DIR = 'TEMP/'
FINAL_DIR = 'pickle/'

class Op(Enum):
    ADD = '+'
    SUB = '-'
    MULT = '*'
    DIV = '/'
    POW = '^'
    
class Const(Enum):
    CONST_NEG_1 = 'const_neg_1' # I added this
    CONST_0_25 = 'const_0_25'
    CONST_0_2778 = 'const_0_2778'
    CONST_0_33 = 'const_0_33'
    CONST_0_3937 = 'const_0_3937'
    CONST_1 = 'const_1'
    CONST_1_6 = 'const_1_6'
    CONST_2 = 'const_2'
    CONST_3 = 'const_3'
    CONST_PI = 'const_pi'
    CONST_3_6 = 'const_3_6'
    CONST_4 = 'const_4'
    CONST_5 = 'const_5'
    CONST_6 = 'const_6'
    CONST_10 = 'const_10'
    CONST_12 = 'const_12'
    CONST_26 = 'const_26'
    CONST_52 = 'const_52'
    CONST_60 = 'const_60'
    CONST_100 = 'const_100'
    CONST_180 = 'const_180'
    CONST_360 = 'const_360'
    CONST_1000 = 'const_1000'
    CONST_3600 = 'const_3600'

values = [-1, 0.25, 0.2778, 0.33, 0.3937, 1, 1.6, 2, 3, math.pi, 3.6, 4, 5, 6, 10, 12, 26, 52, 60, 100, 180, 360, 1000, 3600]
const2val = {k:v for k,v in zip(Const._value2member_map_.keys(), values)}    

op2id = {k:v for k,v in zip(Op._value2member_map_.keys(), range(len(Op._value2member_map_)))}
const2id = {k:v for k,v in zip(Const._value2member_map_.keys(), range(len(Const._value2member_map_)))}

## Loading the data

In [20]:
data = {name:pd.read_csv(f'{DATA_PATH}{name}.csv') for name in SET_NAMES}

This function converts all of the constants used in a problem to a single one hot encoded vector used for multi class classification

In [21]:
def onehot_const(data):
    labels = []
    for num_set in data.nums:
        num_set = eval(num_set)
        idx = [const2id[num] for num in num_set if num in const2id]
        onehot = np.zeros(len(const2id))
        onehot[idx] = 1
        labels.append(onehot)
    return np.array(labels)

Getting only the necessary columns from the data (text/label)

In [22]:
data = {name:Dataset.from_dict({'text':data[name]['problem'], 'labels':onehot_const(data[name])}) for name in SET_NAMES}
data['train'].set_format('torch')
data['validation'].set_format('torch')
data['test'].set_format('torch')

## Tokenization

In [23]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_TYPE)

def tokenization(items):
    return tokenizer(items['text'], padding='max_length', max_length=MAX_LENGTH, truncation=True)

encoded = {k:v.map(tokenization, batched=True, remove_columns=['text']) for k,v in data.items()}

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Map:   0%|          | 0/18215 [00:00<?, ? examples/s]

Map:   0%|          | 0/2710 [00:00<?, ? examples/s]

Map:   0%|          | 0/1798 [00:00<?, ? examples/s]

## Training

Training a model to predict which constants are used given a problem description

In [24]:
batch = 4
grad_acc = 4
model_path = MODEL_PATH
if TRAINING_MODE:
    model = AutoModelForSequenceClassification.from_pretrained(MODEL_TYPE, 
                                                               problem_type = 'multi_label_classification', 
                                                               num_labels = len(const2id))
else:
    model = AutoModelForSequenceClassification.from_pretrained(model_path, 
                                                               problem_type = 'multi_label_classification', 
                                                               num_labels = len(const2id))
    
def get_metrics(y_true, y_pred):
    return {
        'micro-f1': f1_score(y_true=y_true, y_pred=y_pred, average='micro'), 
        'macro-f1': f1_score(y_true=y_true, y_pred=y_pred, average='macro'),
        'weighted-f1': f1_score(y_true=y_true, y_pred=y_pred, average='weighted'),
        'accuracy': accuracy_score(y_true=y_true, y_pred=y_pred)
    }

def compute_metrics_helper(p, label, thresh=0.5):
    # Converting all values in the vector to be between 0 and 1
    # The reason why softmax is not used is because we are doing multi label classification, meaning the total sum may be above 1
    sigmoid = torch.nn.Sigmoid()
    prob = sigmoid(torch.Tensor(p))
    
    # Converting items above the threshold to integers
    y_pred = np.zeros(prob.shape)
    y_pred[np.where(prob >= thresh)] = 1
    
    # Computing the metrics (f1, accuracy)
    return get_metrics(label, y_pred)

def compute_metrics(p):
    return compute_metrics_helper(p.predictions, p.label_ids)

args = TrainingArguments(
    output_dir = f'{WORKING_DIR}{model_path}',
    evaluation_strategy='steps',
    eval_steps=500,
    logging_strategy="steps",
    logging_steps=500,
    save_strategy="steps",
    save_steps=500,
    learning_rate = 5e-6, # DeBERTa requires a lower learning rate
    per_device_train_batch_size = batch,
    per_device_eval_batch_size = batch,
    gradient_accumulation_steps=grad_acc,
    weight_decay=.01,
    save_total_limit=3,
    num_train_epochs = 5,
    metric_for_best_model="accuracy",  
)

trainer = Trainer(
    model,
    args,
    train_dataset = encoded['train'],
    eval_dataset = encoded['validation'],
    compute_metrics = compute_metrics,
)

Downloading pytorch_model.bin:   0%|          | 0.00/874M [00:00<?, ?B/s]

Some weights of DebertaV2ForSequenceClassification were not initialized from the model checkpoint at microsoft/deberta-v3-large and are newly initialized: ['classifier.weight', 'pooler.dense.bias', 'classifier.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [9]:
if TRAINING_MODE:
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        trainer.train()
        trainer.save_model(model_path)
else:
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        predictions = {name:trainer.predict(encoded[name]) for name in SET_NAMES}

## Analysis and Tuning

As we can see, f1 and accuracy scores lie within the 80-85% accuracy range (except for macro f1, but we do not care about that as much for this specific problem).

In [10]:
if not TRAINING_MODE:
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        for name in SET_NAMES:
            print(f"{name.upper()}: ")
            print(compute_metrics_helper(predictions[name].predictions, predictions[name].label_ids))
            print()

TRAIN: 
{'micro-f1': 0.9210672595886603, 'macro-f1': 0.49350996493174204, 'weighted-f1': 0.9136586570272305, 'accuracy': 0.8795498215756244}

VALIDATION: 
{'micro-f1': 0.8561643835616438, 'macro-f1': 0.4654305745971823, 'weighted-f1': 0.8468692698818778, 'accuracy': 0.8129151291512915}

TEST: 
{'micro-f1': 0.8450920245398772, 'macro-f1': 0.4386897976899995, 'weighted-f1': 0.83637342170732, 'accuracy': 0.7975528364849833}



This is an acceptable accuracy, however, ideally this should be higher, especially if this is intended to be used on a downstream task. For our specific task, simply finding the constants that are useful for the problem, we care much more about recall than precision. Below we show some analysis by looking at the percentage of problems with missed constants and experimenting with the threshold value.

In [11]:
def percent_without_missed_const(p, label, thresh=0.5):
    # Converting all values in the vector to be between 0 and 1
    # The reason why softmax is not used is because we are doing multi label classification, meaning the total sum may be above 1
    sigmoid = torch.nn.Sigmoid()
    prob = sigmoid(torch.Tensor(p))
    
    # Converting items above the threshold to integers
    y_pred = np.zeros(prob.shape)
    y_pred[np.where(prob >= thresh)] = 1
    
    y_true = label
    
    # if the true set is a subset of the predicted set then it did not miss any constants in the problem
    # returns avg # constants per problem, percent without any missed
    return np.mean(np.sum(y_pred, axis=1)), np.sum([set(np.where(true==1)[0]) <= set(np.where(pred==1)[0]) for true, pred in zip(y_true, y_pred)])/len(y_true)

Shown below is the percentage of problems without any missed constants along with the average number of constants predicted given different thresholds. Using a threshold of .005 on the validation set still results in over a 98% coverage, but reduces the average number of constants from 24 to just under 6.

In [12]:
thresholds = [0.5, 0.05, 0.005, 0.001, 0]
for thresh in thresholds:
    print(f'THRESH: {thresh}')
    avg_per_prob, percent_no_missed = percent_without_missed_const(predictions['validation'].predictions, predictions['validation'].label_ids, thresh)
    print(f"Percent without any missed: {percent_no_missed}")
    print(f"Average number of constants per problem: {avg_per_prob}")
    print()

THRESH: 0.5
Percent without any missed: 0.8557195571955719
Average number of constants per problem: 1.033579335793358

THRESH: 0.05
Percent without any missed: 0.9254612546125461
Average number of constants per problem: 1.8154981549815499

THRESH: 0.005
Percent without any missed: 0.9826568265682657
Average number of constants per problem: 5.815129151291513

THRESH: 0.001
Percent without any missed: 0.9981549815498155
Average number of constants per problem: 12.17490774907749

THRESH: 0
Percent without any missed: 1.0
Average number of constants per problem: 24.0



To show that this generalizes, showing the test set using a .005 threshold

In [13]:
thresholds = [.005]
for thresh in thresholds:
    print(f'THRESH: {thresh}')
    avg_per_prob, percent_no_missed = percent_without_missed_const(predictions['test'].predictions, predictions['test'].label_ids, thresh)
    print(f"Percent without any missed: {percent_no_missed}")
    print(f"Average number of constants per problem: {avg_per_prob}")
    print()

THRESH: 0.005
Percent without any missed: 0.9805339265850945
Average number of constants per problem: 5.800889877641824



## Storing Results

Lastly, the the predicted values along with a mapping are saved to disk for use later.

In [17]:
def get_pred(p, thresh=0.005):
    # Converting all values in the vector to be between 0 and 1
    # The reason why softmax is not used is because we are doing multi label classification, meaning the total sum may be above 1
    sigmoid = torch.nn.Sigmoid()
    prob = sigmoid(torch.Tensor(p))
    
    # Converting items above the threshold to integers
    y_pred = np.zeros(prob.shape)
    y_pred[np.where(prob >= thresh)] = 1
    
    return y_pred

y_pred = {name:get_pred(predictions[name].predictions) for name in SET_NAMES}

In [18]:
results = {'pred': y_pred, 'map':const2id}

In [20]:
if not os.path.exists(FINAL_DIR):
    os.makedirs(FINAL_DIR)
    
with open(f'{FINAL_DIR}constants.pickle', 'wb') as f:
    pickle.dump(results, f)