# T5 TFRecord Preprocessing

This notebook preprocesses the DROP dataset in JSON format to a TFRecords dataset that can be fed into the model after batching for training.

In [None]:
if 'colab' in str(get_ipython()):
    import google.colab as colab
    colab.auth.authenticate_user()
    colab.drive.mount('/content/gdrive') # mount google drive

    # install libraries not native to colab
    !pip install tensorflow-text
    !pip install transformers==3.3.1
    !pip install datasets==1.1.2
    !pip install tqdm

# remove pip install outputs
from IPython.display import clear_output
clear_output()

In [None]:
# ml libraries
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.backend as K
import transformers
import datasets # https://huggingface.co/docs/datasets/

# data processing libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

# other libraries
import os
import json
import functools
import time
import warnings
from datetime import datetime
from typing import Generator

print(f'TensorFlow {tf.__version__}')

TensorFlow 2.2.0


## Configurations

In [None]:
#@title Constants
T5_MODEL                = 't5-small'  #@param { type: "string" }
ENCODER_MAX_LEN         = 512 #@param { type: "integer" }
DECODER_MAX_LEN         = 54 #@param { type: "integer" } 
TOKENIZER               = transformers.AutoTokenizer.from_pretrained(T5_MODEL)
special_tokens_dict     = {'additional_special_tokens': ['0','1','2','3', '4', '5', '6', '7', '8', '9', '<ss>', '<sv>']}
num_added_toks          = TOKENIZER.add_special_tokens(special_tokens_dict)
print(f'New tokens added: {num_added_toks}')

# tim's default paths
LOCAL_PATH              = '' # local folder containing DROP json
TRAIN_JSON              = 'drop_dataset_train.json' 
DEV_JSON                = 'drop_dataset_dev.json' 
DROP_JSON_DIR_LOCAL     = LOCAL_PATH + '/data/drop_dataset' #@param { type: "string" }
DROP_JSON_DIR_GCP       = '' # gcp storage path for saving tfrecords #@param { type: "string" }
EXPORT_DIR              = LOCAL_PATH + '/temp' #@param { type: "string" }

TRAIN_DROP_JSON_GCP     = os.path.join(DROP_JSON_DIR_GCP, TRAIN_JSON)
DEV_DROP_JSON_GCP       = os.path.join(DROP_JSON_DIR_GCP, DEV_JSON) 

New tokens added: 2


In [None]:
#@title Test Mode Setup
def sample_data_to_file(
                json_file: str,
                sample_count: int,
                source_dir: str, 
                export_dir: str = EXPORT_DIR) -> str:
    
    # load source json
    source_file_path = os.path.join(source_dir, json_file)
    with tf.io.gfile.GFile(source_file_path) as f:
        data_json_dict = json.load(f)  # dict of dict
        sampled_data_json_dict = {
            key: value
            for i, (key, value) in enumerate(data_json_dict.items())
            if i < sample_count
        }

    # prep export json file path
    sample_count = len(sampled_data_json_dict)  # get the real sample count
    sampled_output_path = os.path.join(export_dir, json_file)
    sampled_output_path = sampled_output_path.replace('.json', f'{sample_count}.json')

    # export sample json
    with tf.io.gfile.GFile(sampled_output_path, 'w') as fout:
        json.dump(sampled_data_json_dict, fout)

    print(f'Dumped {sample_count} samples to file: {sampled_output_path}')
    return sampled_output_path

SAMPLE_DATA = True  #@param { type: "boolean" }
SAMPLE_COUNT = 3 #@param { type: "integer" }
if SAMPLE_DATA:
    train_json = sample_data_to_file(
                    json_file=TRAIN_JSON, 
                    sample_count=SAMPLE_COUNT,
                    source_dir=DROP_JSON_DIR_LOCAL,
                    export_dir=EXPORT_DIR)
    dev_json = sample_data_to_file(
                    json_file=DEV_JSON, 
                    sample_count=SAMPLE_COUNT,
                    source_dir=DROP_JSON_DIR_LOCAL,
                    export_dir=EXPORT_DIR)
else:
    train_json = os.path.join(DROP_JSON_DIR_LOCAL, TRAIN_JSON)
    dev_json = os.path.join(DROP_JSON_DIR_LOCAL, DEV_JSON)
    print(train_json)
    print(dev_json)

## Encoding TFRecord

In [None]:
def encode(
        example: dict,
        encoder_max_len: int = ENCODER_MAX_LEN, 
        decoder_max_len: int = DECODER_MAX_LEN,
        tokenizer: transformers.PreTrainedTokenizer = TOKENIZER) -> dict:
    """Tokenize data.
    Args:
        example (dict): Raw dict parsed from DROP json:
                            example['context']
                            example['question']
                            example['answer']
    Returns: 
        (dict) Dictionary with values tokenized:
                            return['input_ids']
                            return['attention_mask']
                            return['labels']
                            return['decoder_attention_mask']
    """            
    context = example['context']
    question = example['question'] # '1+1+(1+(1+1))=10'
    answer = example['answer']
  
    question_plus = f"answer_me: {str(question)} context: {str(context)}"
    answer_plus = str(answer)

    encoder_inputs = tokenizer(
                            question_plus, 
                            truncation=True, 
                            return_tensors='tf', 
                            max_length=encoder_max_len,
                            padding='max_length')
    decoder_inputs = tokenizer(
                            answer_plus, 
                            truncation=True, 
                            return_tensors='tf', 
                            max_length=decoder_max_len,
                            padding='max_length')
    
    return {
        'query_id'                  : [str.encode(example['query_id'])], 
        'answer_type'               : [str.encode(example['answer_type'])], 
        'validated_answers'         : [str.encode(example['validated_answers'])],
        'input_ids'                 : encoder_inputs['input_ids'][0], 
        'attention_mask'            : encoder_inputs['attention_mask'][0], 
        'labels'                    : decoder_inputs['input_ids'][0], 
        'decoder_attention_mask'    : decoder_inputs['attention_mask'][0]}

def _bytes_feature(values):
  """Returns a bytes_list from a list of string / byte."""
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=values))

def _float_feature(values):
  """Returns a float_list from a list of float / double."""
  return tf.train.Feature(float_list=tf.train.FloatList(value=values))

def _int64_feature(values):
    """Returns an int64_list from a list of bool / enum / int / uint."""
    try:
        return tf.train.Feature(int64_list=tf.train.Int64List(value=values))
    except Exception as e:
        print(f'failed at: {values}')
        raise e

def to_serialized_example(encoded_example):
    feature = {
        'query_id': _bytes_feature(encoded_example['query_id']),
        'answer_type': _bytes_feature(encoded_example['answer_type']),
        'validated_answers': _bytes_feature(encoded_example['validated_answers']),
        'input_ids': _int64_feature(encoded_example['input_ids']),
        'attention_mask': _int64_feature(encoded_example['attention_mask']),
        'labels': _int64_feature(encoded_example['labels']),
        'decoder_attention_mask': _int64_feature(encoded_example['decoder_attention_mask']),
    }
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

def parse_validated_answers(validated_answers: list) -> str:
    answers = []
    for answer_dict in validated_answers:
        ans, _ = parse_answer(answer_dict)
        answers.append(ans)
    return '<sv>'.join([a.strip() for a in answers if a.strip()])
        
def parse_answer(answer_dict: dict) -> str:
    """
    Example of answer_dict = { "number": "", "date": { "day": "", "month": "", "year": "" }, "spans": [ "Chaz Schilens" ] }
    Returns a string or None.
    """
    number = answer_dict['number'].strip()
    if number:
        return number, 'n'
    
    spans = answer_dict['spans']
    spans_str = '<ss>'.join([span.strip() for span in spans if span.strip()])
    if spans_str:
        if len(spans) > 1:
            return spans_str, 's'
        else:
            return spans_str, 'ss'
    
    date = answer_dict['date']
    if len(date) != 3:
        return None, None
    date = ' '.join([d.strip() for d in [date['day'], date['month'], date['year']] if d.strip()])
    if date:
        return date, 'd'
    
    return None, None

def make_data_generator(file_path: str) -> Generator:
    """
    json_example: {
        "passage": "foo bar",
        "qa_pairs": [
            {
                "question": "foo bar",
                "answer": { "number": "", "date": { "day": "", "month": "", "year": "" }, "spans": [ "Chaz Schilens" ] },
                "query_id": "f37e81fa-ef7b-4583-b671-762fc433faa9",
            }
        ]
    }

    Returns a generator function.
    """
    with tf.io.gfile.GFile(file_path) as json_file:
        data_json_dict = json.load(json_file) 

    def gen():
        for key, json_example in data_json_dict.items():
            passage = json_example['passage']
            for qa_dict in json_example['qa_pairs']:
                
                question = qa_dict['question']
                answer, answer_type = parse_answer(qa_dict['answer'])
                query_id = qa_dict['query_id']
                if 'validated_answers' in qa_dict:
                    valid_ans = parse_validated_answers(qa_dict['validated_answers'])
                else:
                    valid_ans = ''                
                if answer is None:
                    continue

                encoded_example = encode({
                    'context': passage,
                    'question': question,
                    'answer': answer,
                    'validated_answers': valid_ans,
                    'answer_type': answer_type,
                    'query_id': query_id
                })
                serialized_example = to_serialized_example(encoded_example)
                
                yield serialized_example
    
    return gen

def get_num_examples(file_path):
    with tf.io.gfile.GFile(file_path) as json_file:
        data_json_dict = json.load(json_file) 
        return len(data_json_dict)

def load_dataset(file_path: str) -> tf.data.Dataset:
    return tf.data.Dataset.from_generator(
                        make_data_generator(file_path),
                        output_types=tf.string)

In [None]:
SUFFIX = '_test'
def write_json_to_tf_record_file(json_path: str) -> str:
    dataset = load_dataset(json_path)
    tf_record_file_path = os.path.join(
        os.path.dirname(json_path),
        os.path.basename(json_path).replace('.json', f'{SUFFIX}.tfrecord')
    )

    print(f'Writing TF Record file to: {tf_record_file_path} ...')
    writer = tf.data.experimental.TFRecordWriter(tf_record_file_path)
    writer.write(dataset)

    return tf_record_file_path

train_tfrec_path = write_json_to_tf_record_file(train_json)
dev_tfrec_path = write_json_to_tf_record_file(dev_json)