# Helper functions

## Convert word numbers to symbol numbers

In [1]:
# The majority of this code is to set up the numwords dict, which is only done on the first call.
import re
def text2int(textnum, numwords={}):
    if not numwords:
      units = [
        "zero", "one", "two", "three", "four", "five", "six", "seven", "eight",
        "nine", "ten", "eleven", "twelve", "thirteen", "fourteen", "fifteen",
        "sixteen", "seventeen", "eighteen", "nineteen",
      ]

      tens = ["", "", "twenty", "thirty", "forty", "fifty", "sixty", "seventy", "eighty", "ninety"]

      scales = ["hundred", "thousand", "million", "billion", "trillion"]

      numwords["and"] = (1, 0)
      for idx, word in enumerate(units):    numwords[word] = (1, idx)
      for idx, word in enumerate(tens):     numwords[word] = (1, idx * 10)
      for idx, word in enumerate(scales):   numwords[word] = (10 ** (idx * 3 or 2), 0)

    # to remove hyphens between word numbers
    textnum = re.sub(r'(\b\w+-\w+\b)', lambda match: match.group(0).replace('-', ' '), textnum)
    textnum = re.sub(r'(\b\w+-\w+\b)', lambda match: match.group(0).replace('-', ' '), textnum)
    # Preprocess the text to remove commas only between digits
    textnum = re.sub(r',', '', textnum)
    textnum = re.sub(r',', '', textnum)
    textnum = re.sub(r'\b(once|twice|thrice)\b', lambda match: '1 time' if match.group(1) == 'once' else ('2 times' if match.group(1) == 'twice' else '3 times'), textnum)

    resultStr = ''
    current = result = 0
    numFinish = True
    for word in textnum.split():
      # print(current)
      # Code for converting time
      if ':' in word:
        # print(word)
        if resultStr!='':
            resultStr+=' '
        try:
          hours, minutes = map(int, word.split(':'))
          minutes_fraction = minutes / 60
          # print(minutes_fraction)
          resultStr += f"{hours}.{int((minutes_fraction)*100)}"
          numFinish = True
          continue
        except: pass

      if word not in numwords:
        if resultStr!='':
            resultStr+=' '
        if numFinish == False:
          resultStr+=str(current)
          resultStr+=' '
        current = result = 0
        numFinish = True
        resultStr+= word
        continue

      numFinish = False
      scale, increment = numwords[word]
      current = current * scale + increment
      if scale > 100:
          result += current
          current = 0
    # Preprocess the text to replace plural forms with singular forms
    resultStr = re.sub(r'\b(hundreds|decades)\b', lambda match: match.group(0).rstrip('s'), resultStr)

    # Preprocess the text to convert decades to years
    resultStr = re.sub(r'(\b\d+\b)?\s+decade\b', lambda match: f" {str(int(match.group(1) or 1) * 10)}" + ' years', resultStr)

    # Preprocess the text to convert centuries to years
    resultStr = re.sub(r'(\b\d+\b)?\s+centur(y|ies)\b', lambda match: f" {str(int(match.group(1) or 1) * 100)}" + ' years', resultStr)
    resultStr = re.sub(r'  ', ' ', resultStr)
    if(numFinish):
      return resultStr
    else:
      return resultStr+' '+str(result)+str(current)

# print(text2int("There are five-hundred-twenty-three apples and thirty-four oranges 2:54 am."))
# print(text2int("This has been going on for 10 centuries"))
# print(text2int("I went there once twice thrice"))

# remove commas in numbers, convert time to numbers, hundreds to hungred, century to 100 years, 1 century to 100 years, decades to 10 years, twice to 2 times, 4:54 is 4.(54/60)

## Convert numbers to form expected by NumBERT

In [2]:
number_pattern = re.compile(r"(\d+)\.?(\d*)")
def apply_scientific_notation(line):
  """Convert all numbers in a line to scientific notation."""
  return re.sub(number_pattern, number_repl, line)

def number_repl(matchobj):
  """Given a matchobj from number_pattern, it returns a string writing the corresponding number in scientific notation."""
  pre = matchobj.group(1).lstrip("0")
  post = matchobj.group(2)
  if pre and int(pre):
    # number is >= 1
    exponent = len(pre) - 1
  else:
    # find number of leading zeros to offset.
    exponent = -re.search("(?!0)", post).start() - 1
    post = post.lstrip("0")
  return (pre + post).rstrip("0") + " scinotexp " + str(exponent)


## Convert numBERT checkpoints into pytorch weights

In [None]:
#  coding=utf-8
# Copyright 2018 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# """Convert BERT checkpoint."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import torch

from transformers import BertConfig, BertForPreTraining, load_tf_weights_in_bert

import logging
logging.basicConfig(level=logging.INFO)

def convert_tf_checkpoint_to_pytorch(tf_checkpoint_path, bert_config_file, pytorch_dump_path):
    # Initialise PyTorch model
    config = BertConfig.from_json_file(bert_config_file)
    print("Building PyTorch model from configuration: {}".format(str(config)))
    model = BertForPreTraining(config)

    # Load weights from tf checkpoint
    load_tf_weights_in_bert(model, config, tf_checkpoint_path)

    # Save pytorch-model
    print("Save PyTorch model to {}".format(pytorch_dump_path))
    torch.save(model.state_dict(), pytorch_dump_path)

convert_tf_checkpoint_to_pytorch("/content/drive/MyDrive/NumBERT/model.ckpt-1000000",
                                     "/content/drive/MyDrive/NumBERT/bert_config.json",
                                     "/content/drive/MyDrive/NumBERT/pytorch_model")
# convert_tf_checkpoint_to_pytorch("/content/drive/MyDrive/Colab Notebooks/NumBERT/model.ckpt-1000000",
#                                      "/content/drive/MyDrive/Colab Notebooks/NumBERT/bert_config.json",
#                                      "/content/drive/MyDrive/Colab Notebooks/NumBERT/pytorch_model")

# Experiment Code

In [None]:
from transformers import TFBertForMaskedLM
import tensorflow as tf
import os
import numpy as np
import re
import matplotlib.pyplot as plt

## Load Data from Dataset

In [None]:
!pip install datasets
# pip install pyarrow==11.0.0

In [None]:
from datasets import load_dataset

In [None]:
dataset = load_dataset('mc_taco')

In [None]:
# Event ordering and stationary cant be predicted in our model, so removing that data (category : 1,4 )

updated_dataset = dict()

updated_dataset['test'] = [x for x in dataset['test']  if (x['category'] not in [1,4]) and (x['label']!=0)]
updated_dataset['validation'] = [x for x in dataset['validation'] if x['category'] not in [1,4] and (x['label']!=0)]

# 2239 test and 826 validation
# print(len(updated_dataset['test']))

## Load Tokenizer using vocab file

In [None]:
pip install bert-tensorflow

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from bert import tokenization
# numbert vocab from their drive
tokenizer = tokenization.FullTokenizer(vocab_file='/content/vocab.txt', do_lower_case=True)

## Preprocess Input

In [None]:
def LabelCategory(num):
  if(num == 2):
    return 'Typical Frequency'
  elif(num == 3):
    return 'Typical Time'
  elif(num == 0):
    return 'Event Duration'

In [None]:
import sys
from absl import flags
sys.argv=['preserve_unused_tokens=False']
flags.FLAGS(sys.argv)

training_data = list()

for x in updated_dataset['test']:
  temp = dict()
  for i in ['sentence','question','answer']:
    temp[i] = tokenizer.tokenize(apply_scientific_notation(text2int(tokenization.convert_to_unicode(x[i]))))
  temp['category'] = tokenizer.tokenize(LabelCategory(x['category']))
  training_data.append(temp)


## Converting into form that model expects

In [None]:
def ConvertToModelInput(data):
  max_sequence_length = 125
  model_inputs = dict()
  model_inputs['input_ids'] = []
  model_inputs['labels'] = []
  model_inputs['attention_mask'] = []
  for temp in data:

    # print(temp)
    temp1 = dict()
    temp2 = dict()
    temp1['input_ids'] = temp['sentence']+temp['question']+['[MASK]']*len(temp['answer'])+['[']+temp['category']+[']']
    temp2['input_ids'] = temp['sentence']+temp['question']+temp['answer']+['[']+['[MASK]']*len(temp['category'])+[']']
    for i in [temp1,temp2]:
      i_length = len(i['input_ids'])
      i['labels'] = temp['sentence']+temp['question']+temp['answer']+['[']+temp['category']+[']']
      if(i_length<max_sequence_length):
        # print(len(i['input_ids']))
        i['input_ids'] += (['[PAD]']*(max_sequence_length-i_length))
        i['labels'] += (['[PAD]']*(max_sequence_length-i_length))
      i['attention_mask'] = [1 if idx < i_length else 0 for idx in range(max_sequence_length)]
      # print(len(i['input_ids']))

      model_inputs['input_ids'].append(tf.convert_to_tensor(tokenizer.convert_tokens_to_ids(i['input_ids'])))
      model_inputs['labels'].append(tf.convert_to_tensor(tokenizer.convert_tokens_to_ids(i['labels'])))
      model_inputs['attention_mask'].append(tf.convert_to_tensor(i['attention_mask']))
  model_inputs['input_ids'] = tf.convert_to_tensor(model_inputs['input_ids'])
  model_inputs['labels'] = tf.convert_to_tensor(model_inputs['labels'])
  model_inputs['attention_mask'] = tf.convert_to_tensor(model_inputs['attention_mask'])
  return model_inputs

## Model Finetuning

In [None]:
import os
from transformers import TFBertForMaskedLM

# Define and compile your model
class MyBertModel(tf.keras.Model):
    def __init__(self):
        super(MyBertModel, self).__init__()

        # Load the pre-trained BERT model
        self.bert = TFBertForMaskedLM.from_pretrained("/content/drive/MyDrive/NumBERT/pytorch_model",from_pt=True, config = "/content/drive/MyDrive/Colab Notebooks/NumBERT/bert_config.json")

        # Define additional layers or modifications to the BERT model if needed
        # Example: self.dense_layer = tf.keras.layers.Dense(256, activation='relu')

    def call(self, inputs, training=False, attention_mask = None):
        # Forward pass through the BERT model
        outputs = self.bert(inputs, training=training)
        if(attention_mask):
          outputs = self.bert(inputs, training=training, attention_mask = attention_mask)
        return outputs

inputs = ConvertToModelInput(training_data)
model = MyBertModel()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
model.compile(optimizer=optimizer, loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))
# Continue training with the custom checkpoint callback
history = model.fit(
    [inputs['input_ids'], inputs['attention_mask']],
    inputs['labels'],
    verbose=1,
    batch_size=8,
    epochs=10,
    validation_split=0.2
)


## Epochs vs loss

In [None]:
# Plot the loss vs. epoch curve as follows

losses = history.history['loss']
fig = plt.figure(figsize=(5,5))
ax1 = fig.add_subplot(111)
ax1.plot(range(len(losses)),losses)
ax1.set_xlabel("Epochs")
ax1.set_ylabel("Loss")
ax1.set_title("Epoch vs Loss")
plt.grid()
plt.show()

# Testing

In [None]:
def ConvertToModelInputTest(data):
  max_sequence_length = 125
  model_inputs = dict()
  model_inputs['input_ids'] = []
  model_inputs['labels'] = []
  model_inputs['attention_mask'] = []
  for temp in data:

    # print(temp)
    temp1 = dict()
    temp1['input_ids'] = temp['sentence']+temp['question']+['[MASK]']*len(temp['answer'])+['[']+temp['category']+[']']
    for i in [temp1]:
      i_length = len(i['input_ids'])
      i['labels'] = temp['sentence']+temp['question']+temp['answer']+['[']+temp['category']+[']']
      if(i_length<max_sequence_length):
        # print(len(i['input_ids']))
        i['input_ids'] += (['[PAD]']*(max_sequence_length-i_length))
        i['labels'] += (['[PAD]']*(max_sequence_length-i_length))
      i['attention_mask'] = [1 if idx < i_length else 0 for idx in range(max_sequence_length)]
      # print(len(i['input_ids']))

      model_inputs['input_ids'].append(tf.convert_to_tensor(tokenizer.convert_tokens_to_ids(i['input_ids'])))
      model_inputs['labels'].append(tf.convert_to_tensor(tokenizer.convert_tokens_to_ids(i['labels'])))
      model_inputs['attention_mask'].append(tf.convert_to_tensor(i['attention_mask']))
  model_inputs['input_ids'] = tf.convert_to_tensor(model_inputs['input_ids'])
  model_inputs['labels'] = tf.convert_to_tensor(model_inputs['labels'])
  model_inputs['attention_mask'] = tf.convert_to_tensor(model_inputs['attention_mask'])
  return model_inputs
test_data = list()
for x in updated_dataset['validation']:
  temp = dict()
  for i in ['sentence','question','answer']:
    temp[i] = tokenizer.tokenize(apply_scientific_notation(text2int(tokenization.convert_to_unicode(x[i]))))
  temp['category'] = tokenizer.tokenize(LabelCategory(x['category']))
  # print(temp)
  test_data.append(temp)
input_test = ConvertToModelInputTest(test_data)

In [None]:
import pandas as pd

outputs = dict()
# outputs['predicted answer'] = []
outputs['Experiment'] = []
outputs['expected answer'] = []
# print(len(inputs))
prev_q = ''
for i in range(len(input_test['input_ids'])):
  if(updated_dataset['validation'][i]['question'] == prev_q ):
    continue
  prev_q = updated_dataset['validation'][i]['question']
  # print(inputs['input_ids'][i].numpy())
  mask_loc = np.where(inputs['input_ids'][i].numpy() == 103)[0].tolist()
  # print(mask_loc)
  out = model(tf.convert_to_tensor([input_test['input_ids'][i]])).logits[0].numpy()
  predicted_token_ids = np.argmax(out, axis=1).tolist()
  predicted_tokens = tokenizer.convert_ids_to_tokens(predicted_token_ids)
  non_pad_tokens = [token for token in predicted_tokens if token != '[PAD]']
  outputs['Experiment'].append(' '.join(non_pad_tokens))
  outputs['expected answer'].append(updated_dataset['validation'][i]['answer'])
  # filtered_tokens = [predicted_tokens[token] for token in range(len(predicted_tokens)) if (predicted_tokens[token] != "[PAD]" and token in mask_loc)]
  # outputs['predicted answer'].append(' '.join(filtered_tokens))

# Create DataFrame
df = pd.DataFrame(outputs)
# Define the file name
csv_file = "outputs_NoRepeat.csv"
# Save DataFrame to CSV
df.to_csv(csv_file, index=False)
print(f"Dataset saved to {csv_file}")