# Tokenizer Training

We will begin by training a tokenizer on our desired dataset. The Children's Book Test (CBT) dataset is used in this example. HuggingFace provides the tokenizer, and in this case the dataset as well. We will take HuggingFace's base RoBERTa tokenizer and retrain it on CBT.

In [1]:
import os
import time
import pickle

import pyarrow as pa
from pyarrow import fs

from datasets import load_dataset
from transformers import AutoTokenizer

import ray
from ray.air import session
from ray.air.config import ScalingConfig
from ray.train.torch import TorchTrainer

Large-scale datasets are often split into multiple individual files, or "shards". While WikiText-103 is not truly large enough to merit this treatment, we will shard the dataset manually for illustrative purposes. 

Shards reside in `./wiki_shards/`

In [3]:
# Fetch dataset and split into 10 shards for illustration purposes
dataset = load_dataset("cbt", name = "CN", split = "train")

Downloading and preparing dataset cbt/CN to /home/metalcycling/Temporary/Scratch/.cache/huggingface/cbt/CN/1.1.0/73e4c9316b0d86a7addd7f80183fb971a6161fa2f8b746da034e205b7e16f78d...


Downloading data:   0%|          | 0.00/121M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/120769 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/2500 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/2000 [00:00<?, ? examples/s]

Dataset cbt downloaded and prepared to /home/metalcycling/Temporary/Scratch/.cache/huggingface/cbt/CN/1.1.0/73e4c9316b0d86a7addd7f80183fb971a6161fa2f8b746da034e205b7e16f78d. Subsequent calls will reuse this data.


In [6]:
if not os.path.exists("cbt_shards/"):
    os.mkdir("cbt_shards")
    
num_shards = 10

for i in range(num_shards):
    shard = dataset[(i + 0) * len(dataset) // num_shards : (i + 1) * len(dataset) // num_shards]["sentences"]
    
    with open("cbt_shards/cbt_shard_%d.pkl" % (i), "wb") as fileptr:
        pickle.dump(shard, fileptr)

In [7]:
# Define an iterator over our dataset that returns batches of 64 lines at a time
shards = os.listdir("./cbt_shards")
min_sentence_length = 10

def batch_iterator():
    for shard in shards:
        with open("./cbt_shards/%s" % (shard), "rb") as fileptr:
            data = pickle.load(fileptr)
            
        batch = []
        
        for idx in range(len(data)):
            paragraph = " ".join(data[idx])
        
        for sentences in data:
            paragraph = 
            if len(line) > min_sentence_length: # remove trivially short sentences
                batch.append(line)
                
            if len(batch) == 64: # If batch is of size 64 return it
                yield batch
                batch = []
                
        yield batch
        
        print("Shard '%s' completed" % (shard))

In [71]:
len(data)

12076

In [13]:
batch = batch_iterator()
data = pickle.load(open("./cbt_shards/cbt_shard_0.pkl", "rb"))
print(len(data)

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [69]:
len((" ".join(data[4])).split())

419

In [8]:
# Create and train our tokenizer
vocab = 50000 + 256 + 5 # 50K learned tokens, 256 base characters, 5 dummy tokens

# Get a BPE tokenizer with the right preprocessors, which we'll then retrain
tokenizer = AutoTokenizer.from_pretrained("roberta-base") 
tokenizer = tokenizer.train_new_from_iterator(batch_iterator(), vocab_size = vocab)
tokenizer.save_pretrained("roberta-tokenizer")

print("Training complete!")






TypeError: 'list' object cannot be converted to 'PyString'

Now that training is done, let's take a look at how the tokenizer handles a real sentence, and how it splits apart words it does not recognize:

In [5]:
# Test run for our new tokenizer
tokenizer = AutoTokenizer.from_pretrained("roberta-tokenizer")

sample = "This was a triumph. I'm making a note here: HUGE SUCCESS! It's hard to overstate my satisfaction."
print(sample)

tokens = tokenizer(sample)["input_ids"]
print(tokens)

print("\n".join([tokenizer.decode(token) for token in tokens]))

This was a triumph. I'm making a note here: HUGE SUCCESS! It's hard to overstate my satisfaction.
[0, 56, 21915, 325, 263, 12076, 18, 339, 11, 81, 1896, 263, 6100, 4714, 30, 27994, 33808, 27821, 8572, 7637, 55, 5, 625, 11, 87, 2647, 294, 605, 6077, 1656, 23118, 18, 2]
<s>
T
his
 was
 a
 triumph
.
 I
'
m
 making
 a
 note
 here
:
 HU
GE
 SU
CC
ES
S
!
 It
'
s
 hard
 to
 over
state
 my
 satisfaction
.
</s>


# Pre-Tokenizing the Dataset

Now that we have a trained tokenizer, let's convert our dataset into token indices. At the same time, we'll pack sequences together until we exceed the maximum RoBERTa sequence length of 512. At that point we back off, pad out the sequence, and write that batch to our output shards. By tokenizing, packing, and padding sequences all in advance, we can compress our dataset size, accelerate downstream dataloading, and streamline our training procedure.

This time we will use Ray for parallelism - two workers will iterate over their respective input shards, and each will write a single output shard that we'll use for training. Currently we only support scaling on single nodes - do not attempt to run this notebook on OpenShift!

*(Parallelism on OpenShift for both tokenizer training and preprocessing will be provided in a future release)*

Output shards reside in `./wiki_processed_shards/`

In [19]:
# Define the workload that each parallel Ray actor will run
def process(config):
    shards = config["shards"]
    directory = config["directory"]
    
    seq_len = 512
    shard = session.get_world_rank()
    # Distribute input shards over workers, each of which produces a single output shard
    subsets = shards[(shard + 0) * len(shards) // session.get_world_size() : (shard + 1) * len(shards) // session.get_world_size()] 
    
    # LOAD OUR PRETRAINED TOKENIZER
    tokenizer = AutoTokenizer.from_pretrained("%s/roberta-tokenizer" % (directory))
    schema = pa.schema([pa.field("nums", pa.int16())])
    pad = tokenizer("<pad>")["input_ids"][1]
    eos = tokenizer("</s>")["input_ids"][1]
    
    # ITERATE OVER INPUT FILES, WRITE BATCHES 512 TOKENS AT A TIME TO OUR DATASET SHARD
    buffer = [] # our write buffer
    counter = 0 # written batch counter
    ntrunc = 0
    npad = []
    
    with pa.ipc.new_file("%s/wiki_processed_shards/shard_%03d.arrow" % (directory, session.get_world_rank()), schema) as writer:
        for j in range(len(subsets)): # for each input shard
            filename = subsets[j]
            
            with open("%s/wiki_shards/%s" % (directory, filename), "rb") as fileptr:
                dataset = pickle.load(fileptr)
                
            for entry in dataset:
                line = tokenizer(entry)["input_ids"] # tokenize!
                
                if len(line) > 5: # Ignore short sentences
                    if len(line) > seq_len: # Truncate long sentences
                        line = line[:seq_len - 1] + [eos]
                        ntrunc += 1
        
                    if len(buffer) + len(line) <= seq_len: # If line fits into buffer, add it
                        buffer += line
                    else: 
                        # Else, pad out buffer
                        npad.append(seq_len - len(buffer))
                        buffer += [pad,] * (seq_len - len(buffer))
                        
                        # Write buffer. We subtract 25K to prevent overflow - 
                        # int16 only goes up to 32767, vocab size is >50K
                        batch = pa.record_batch([pa.array([x - 25000 for x in buffer], pa.int16())], schema)
                        writer.write(batch)
                        counter += 1
                        
                        # Clear buffer and write line
                        buffer = line
                        
            print("Shard %d: %d of %d complete, length = %d, avg pad = %f" % (session.get_world_rank(), j + 1, len(subsets), counter, sum(npad) / len(npad)))
                        
            session.report({"training_iteration": j + 1})
            
        # Write final buffer
        buffer += [pad,] * (seq_len - len(buffer))
        npad.append(seq_len - len(buffer))
        batch = pa.record_batch([pa.array([x - 25000 for x in buffer], pa.int16())], schema)
        writer.write(batch)
        counter += 1
        
    writer.close()
    
    print("Shard %d complete, final length = %d lines, with %f pads per %d sequence tokens and %d truncations" % (session.get_world_rank(), counter, sum(npad) / len(npad), seq_len, ntrunc))

In [20]:
# Run our Ray-based pre-tokenizer!
if not os.path.exists("./wiki_processed_shards"):
    os.mkdir("wiki_processed_shards")
    
# For illustrative purposes, we'll condense our 10 input shards into 2 output shards
trainer = TorchTrainer(train_loop_per_worker = process, train_loop_config = { "shards": os.listdir("./wiki_shards"), "directory": os.getcwd() }, scaling_config = ScalingConfig(num_workers = 2))
trainer.fit()

print('Preprocessing complete!')

2023-03-31 16:17:16,439	INFO data_parallel_trainer.py:286 -- GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.


0,1
Current time:,2023-03-31 16:21:04
Running for:,00:03:48.24
Memory:,17.8/62.5 GiB

Trial name,status,loc,iter,total time (s),_timestamp,_time_this_iter_s,_training_iteration
TorchTrainer_07e6f_00000,TERMINATED,192.168.0.106:48720,5,223.719,1680294061,45.7014,5


[2m[36m(TorchTrainer pid=48720)[0m 2023-03-31 16:17:18,662	INFO data_parallel_trainer.py:286 -- GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[36m(RayTrainWorker pid=48768)[0m 2023-03-31 16:17:20,630	INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=2]
[2m[36m(RayTrainWorker pid=48768)[0m Token indices sequence length is longer than the specified maximum sequence length for this model (600 > 512). Running this sequence through the model will result in indexing errors
[2m[36m(RayTrainWorker pid=48769)[0m Token indices sequence length is longer than the specified maximum sequence length for this model (588 > 512). Running this sequence through the model will result in indexing errors


[2m[36m(RayTrainWorker pid=48768)[0m Shard 0: 1 of 5 complete, length = 26348, avg pad = 88.277061


Trial name,_time_this_iter_s,_timestamp,_training_iteration,date,done,episodes_total,experiment_id,experiment_tag,hostname,iterations_since_restore,node_ip,pid,time_since_restore,time_this_iter_s,time_total_s,timestamp,timesteps_since_restore,timesteps_total,training_iteration,trial_id,warmup_time
TorchTrainer_07e6f_00000,45.7014,1680294061,5,2023-03-31_16-21-02,True,,8ba8dac5eb9c462ba86c774329609c2d,0,li-e847394c-35a2-11b2-a85c-c5cec5aeb50b.ibm.com,5,192.168.0.106,48720,223.719,45.1526,223.719,1680294062,0,,5,07e6f_00000,0.0903034


[2m[36m(RayTrainWorker pid=48769)[0m Shard 1: 1 of 5 complete, length = 26445, avg pad = 88.567215
[2m[36m(RayTrainWorker pid=48768)[0m Shard 0: 2 of 5 complete, length = 52918, avg pad = 88.239446
[2m[36m(RayTrainWorker pid=48769)[0m Shard 1: 2 of 5 complete, length = 53214, avg pad = 88.476735
[2m[36m(RayTrainWorker pid=48768)[0m Shard 0: 3 of 5 complete, length = 79738, avg pad = 88.675274
[2m[36m(RayTrainWorker pid=48769)[0m Shard 1: 3 of 5 complete, length = 79889, avg pad = 88.533090
[2m[36m(RayTrainWorker pid=48768)[0m Shard 0: 4 of 5 complete, length = 106106, avg pad = 88.391467
[2m[36m(RayTrainWorker pid=48769)[0m Shard 1: 4 of 5 complete, length = 106429, avg pad = 88.696568
[2m[36m(RayTrainWorker pid=48768)[0m Shard 0: 5 of 5 complete, length = 133232, avg pad = 88.723182
[2m[36m(RayTrainWorker pid=48768)[0m Shard 0 complete, final length = 133233 lines, with 88.722516 pads per 512 sequence tokens and 478 truncations
[2m[36m(RayTrainWorker pid=4

2023-03-31 16:21:04,802	INFO tune.py:762 -- Total run time: 228.36 seconds (228.24 seconds for the tuning loop).


Preprocessing complete!


In [21]:
# Original dataset size:
orig_size = sum([os.path.getsize("./wiki_shards/%s" % (filename)) for filename in os.listdir("./wiki_shards")])
print("Original dataset size (pickle-compressed): %s MB" % (orig_size >> 20))

Original dataset size (pickle-compressed): 520 MB


In [22]:
# New dataset size:
new_size = sum([os.path.getsize("./wiki_processed_shards/%s" % (filename)) for filename in os.listdir("./wiki_processed_shards")])
print("Tokenized dataset size: %s MB" % (new_size >> 20))

Tokenized dataset size: 308 MB


*For large datasets, we have observed a roughly 3-4x reduction in tokenized dataset size compared to raw text. 
For comparison, we've found this to be slightly better than pickle, and slightly worse than gzip.*