In [1]:
import os
import multiprocessing as mp
import numpy as np
import tiktoken
from datasets import load_dataset # pip install datasets
from tqdm import tqdm # pip install tqdm
from utils import tokenize

# 1. load dataset from huggingface

https://huggingface.co/datasets/HuggingFaceFW/fineweb-edu

In [2]:
local_dir = "edu_fineweb10B"
shard_size = int(1e7)

# create the repo for data
DATA_CACHE_DIR = local_dir
os.makedirs(DATA_CACHE_DIR, exist_ok=True)

# download fw data
fw = load_dataset("HuggingFaceFW/fineweb-edu", name="sample-10BT", split="train")

print("object type", type(fw))
print("# of data points", len(fw))
print("one data point\n", fw[0], sep = '')

Resolving data files:   0%|          | 0/2110 [00:00<?, ?it/s]

Loading dataset shards:   0%|          | 0/98 [00:00<?, ?it/s]

object type <class 'datasets.arrow_dataset.Dataset'>
# of data points 9672101
one data point
{'text': 'The Independent Jane\nFor all the love, romance and scandal in Jane Austen’s books, what they are really about is freedom and independence. Independence of thought and the freedom to choose.\nElizabeth’s refusal of Mr. Collins offer of marriage showed an independence seldom seen in heroines of the day. Her refusal of Mr. Darcy while triggered by anger showed a level of independence that left him shocked and stunned.\nThe freedom she exhibited in finally accepting him in direct defiance of Lady Catherine and knowing her father would disapprove was unusual even for Austen. In her last book Anne Elliot is persuaded to refuse Captain Wentworth at Lady Russel’s insistence.\nAlthough Jane played by the rules of the day, all of her writing is infused with how she wanted life to be. She ‘screams’ her outrage at the limitations for women in Emma.\nWhen accosted by Mrs. Elton, Jane Fairfax says

In [3]:
# sample 300,000 data points, about 3 shards
n_examples = 100000

fw_sample = []
for i in range(n_examples):
    fw_sample.append(fw[i])

len(fw_sample)

100000

# 2.  load encoder

In [4]:
# init the tokenizer
enc = tiktoken.get_encoding("gpt2")
eot = enc._special_tokens['<|endoftext|>'] # end of text token

# learn about tokenizer
print('end of text token is', eot)
print('encoding of hello world is', enc.encode_ordinary('hello world!')) # note: that encode_ordinary's input is text string

max_token_value = enc.max_token_value
print('max token value is', max_token_value ) 

assert  max_token_value  < 2**16, "token dictionary too large for uint16"

end of text token is 50256
encoding of hello world is [31373, 995, 0]
max token value is 50256


In [5]:
## move tokenize to utils.py to ensure it is in __main__
# def tokenize(doc):
#     """
#     doc: string of a single document
#     returns a numpy array of unit 16 tokens
#     """
 
#     tokens = [eot] # the special <|endoftext|> token delimits all documents
#     tokens.extend(enc.encode_ordinary(doc["text"]))
#     tokens_np = np.array(tokens)

#     ## note: remove below token check, checking once is enough 
#     ## assert (0 <= tokens_np).all() and (tokens_np < 2**16).all(), "token dictionary too large for uint16"
   
#     tokens_np_uint16 = tokens_np.astype(np.uint16)
#     return tokens_np_uint16

def write_datafile(filename, tokens_np):
    np.save(filename, tokens_np)

In [6]:
# each example has about 800-1000 tokens
n_tokens = len(enc.encode_ordinary(fw_sample[5]['text']))
print("# of tokens in each data point", n_tokens )
print("# of data points in each shard", shard_size//n_tokens )

# of tokens in each data point 869
# of data points in each shard 11507


## 3. write 1d array to file

In [7]:
nprocs = max(1, os.cpu_count()//2)
print("# of cores for compute", nprocs)

# write tokens to shards using cpu parallel computation
with mp.Pool(nprocs) as pool:

    shard_index = 0

    token_count = 0 # already tokenized - yet to writte count
    all_tokens_np = np.empty((shard_size, ), dtype = np.uint16)

    progress_bar = None

    for tokens in pool.imap(tokenize, fw_sample, chunksize=8):
        
        # not need to create a new shard
        if token_count + len(tokens) < shard_size: 
            
            all_tokens_np[token_count: token_count + len(tokens)] = tokens
            token_count += len(tokens)
            if progress_bar is None:
                progress_bar = tqdm(total = shard_size, unit = 'tokens', desc=f"Shard {shard_index}")
            progress_bar.update(len(tokens))
        
        # need to create a new shard
        else:
            
            split = 'val' if shard_index == 0 else 'train'
            filename = os.path.join(DATA_CACHE_DIR, f'edufineweb_{split}_{shard_index:06d}')

            remainder = shard_size - token_count 
            all_tokens_np[token_count:] = tokens[:remainder]
            
            write_datafile(filename, all_tokens_np)
            progress_bar.update(remainder)
            
            # initiate for the next shard
            all_tokens_np = np.empty((shard_size, ), dtype = np.uint16)
            shard_index += 1

            token_count = len(tokens) - remainder
            all_tokens_np[:token_count] = tokens[remainder: ] 
            
            progress_bar = None 
    

    if token_count > 0:
        split = 'val' if shard_index == 0 else 'train'
        filename = os.path.join(DATA_CACHE_DIR, f'edufineweb_{split}_{shard_index:06d}')
        write_datafile(filename, all_tokens_np)

# of cores for compute 7


Shard 0: 100%|██████████| 10000000/10000000 [00:00<00:00, 21163622.20tokens/s]
Shard 1: 100%|█████████▉| 9999759/10000000 [00:00<00:00, 21666308.08tokens/s]
Shard 2: 100%|█████████▉| 9999953/10000000 [00:00<00:00, 21849361.56tokens/s]
Shard 3: 100%|█████████▉| 9999193/10000000 [00:00<00:00, 21676929.02tokens/s]
Shard 4: 100%|█████████▉| 9999286/10000000 [00:00<00:00, 21540302.13tokens/s]
Shard 5: 100%|█████████▉| 9997276/10000000 [00:00<00:00, 21852983.87tokens/s]
Shard 6: 100%|█████████▉| 9999095/10000000 [00:00<00:00, 21658100.61tokens/s]
Shard 7: 100%|█████████▉| 9999599/10000000 [00:00<00:00, 21713272.98tokens/s]
Shard 8: 100%|█████████▉| 9999578/10000000 [00:00<00:00, 21638808.96tokens/s]
Shard 9: 100%|█████████▉| 9999914/10000000 [00:00<00:00, 21292991.58tokens/s]
Shard 10:  44%|████▍     | 4435500/10000000 [00:00<00:00, 21866065.89tokens/s]

In [8]:
# nprocs = max(1, os.cpu_count()//2)
# print("# of cores for compute", nprocs)

# # write tokens to shards using cpu parallel computation
# progress_bar = None
# shard_index = 0

# token_count = 0 # already tokenized - yet to writte count
# all_tokens_np = np.empty((shard_size, ), dtype = np.uint16)

# for tokens in map(tokenize, fw_sample):
        
#     # not need to create a new shard
#     if token_count + len(tokens) < shard_size: 
        
#         all_tokens_np[token_count: token_count + len(tokens)] = tokens
#         token_count += len(tokens)
#         if progress_bar is None:
#             progress_bar = tqdm(total = shard_size, unit = 'tokens', desc=f"Shard {shard_index}")
#         progress_bar.update(len(tokens))
    
#     # need to create a new shard
#     else:
        
#         split = 'val' if shard_index == 0 else 'train'
#         filename = os.path.join(DATA_CACHE_DIR, f'edufineweb_{split}_{shard_index:06d}')

#         remainder = shard_size - token_count 
#         all_tokens_np[token_count:] = tokens[:remainder]
        
#         print(token_count, len(tokens), progress_bar)
#         write_datafile(filename, all_tokens_np)
#         progress_bar.update(remainder)
        
#         # initiate for the next shard
#         shard_index += 1
#         all_tokens_np = np.empty((shard_size, ), dtype = np.uint16)

#         token_count = len(tokens) - remainder
#         all_tokens_np[:token_count] = tokens[remainder: ] 
#         progress_bar = None 
    

#     if token_count > 0:
#         split = 'val' if shard_index == 0 else 'train'
#         filename = os.path.join(DATA_CACHE_DIR, f'edufineweb_{split}_{shard_index:06d}')
#         write_datafile(filename, all_tokens_np)

Shard 10:  45%|████▌     | 4500787/10000000 [00:15<00:00, 21866065.89tokens/s]