# Preprocess WebInstructSub

https://huggingface.co/datasets/TIGER-Lab/WebInstructSub

In [1]:
import os
from datasets import load_dataset

DATASET_DIR = "."
local_dir =  "data" 
DATA_CACHE_DIR = os.path.join(DATASET_DIR, local_dir)
os.makedirs(DATA_CACHE_DIR, exist_ok=True)

# download the dataset
dataset = load_dataset("TIGER-Lab/WebInstructSub", split="train", cache_dir=DATA_CACHE_DIR)


## Let's look at a few samples

In [2]:
import textwrap

it = iter(dataset)
for i in range(5):
    item = next(it)
    print(f"User: ", item['question'])
    ans = item['answer']
    print(f"Response length: {len(ans)}")
    print("Assistant:")
    print(textwrap.fill(ans, width=100))
    print(100*"-")
 

User:  What is an activated complex?
Response length: 1366
Assistant:
 The activated complex is formed when the reactants collide with each other and begin to rearrange
their atoms and bonds to form the products. This process requires energy, which is why the activated
complex has a higher energy than the reactants. The energy required to reach the activated complex
is called the activation energy.  Once the activated complex is formed, it can either decompose back
into the reactants or proceed to form the products. The probability of the activated complex
decomposing back into the reactants is determined by the activation energy. If the activation energy
is high, then the activated complex is more likely to decompose back into the reactants. If the
activation energy is low, then the activated complex is more likely to proceed to form the products.
The activated complex is a key concept in understanding chemical reactions. It helps to explain why
some reactions occur quickly and others

## System Spec


In [3]:
import math


num_cpus = os.cpu_count()
print(f"""
sytem statistics:
-----------------
cpu count: {num_cpus}""")
total_docs = len(dataset)
docs_per_cpu = int(math.ceil(total_docs/num_cpus))
print(f"""
dataset statistics
------------------
documents: {total_docs:,}
docs_per_cpu: {docs_per_cpu:,}""")



sytem statistics:
-----------------
cpu count: 12

dataset statistics
------------------
documents: 2,335,220
docs_per_cpu: 194,602


## Dummy Preprocessing Operation


In [None]:
import time
from functools import partial
from multiprocessing import get_context

import concurrent.futures as cf

from transformers import AutoTokenizer

enc = AutoTokenizer.from_pretrained("HuggingFaceTB/SmolLM-135M")

def count_tokens(dataset, enc, idx):
    token_count = len(enc.encode(dataset[idx]['question']))
    token_count += len(enc.encode(dataset[idx]['answer']))
    return token_count
 
ctx = get_context("fork")
with cf.ProcessPoolExecutor(max_workers = num_cpus - 2, mp_context=ctx) as ex:
    start = time.time()
    documents = 0
    tokens = 0
   
    f = partial(count_tokens, dataset, enc)


    for result in ex.map(f, range(len(dataset)), chunksize=docs_per_cpu//10):
        documents += 1
        tokens += result
        documents % 1e4 == 0 and print(f"processed {documents:,}", end="\r")
        
    print(f"processed documents in {time.time()-start:0.2f} seconds")
    print(f"total tokens: {tokens:,}")
    print(f"total documents: {documents:,}")   
    assert(documents == total_docs)

processed 1,160,000

## Setup Tokenizer

In [5]:
import numpy as np

BLOCK_SIZE=2048

def format_and_tokenize(dataset, tokenizer, idx,
                        block_size=BLOCK_SIZE,
                        system_prompt="You are a helpful assistant."):
    question = dataset[idx]["question"].strip()
    answer = dataset[idx]["answer"].strip()
    
    x = [
        "<|im_start|>system\n" + system_prompt + "<|im_end|>\n",
        "<|im_start|>user\n" + question + "<|im_end|>\n",
        "<|im_start|>assistant\n" 
        ] 
    x = "".join(x)
    y = answer + "<|im_end|>"
    tok_x = np.array(tokenizer.encode(x), dtype=np.uint16)
    tok_y = np.array(tokenizer.encode(y), dtype=np.uint16)

    # skip oversized examples
    if len(tok_x) + len(tok_y) > BLOCK_SIZE:
        return None

    tokens = np.concatenate([
        tok_x,
        tok_y,
        np.zeros(block_size - (len(tok_x)+len(tok_y)), dtype=np.uint16)
    ])
    attn_mask = np.concatenate([
        np.ones(len(tok_x)+len(tok_y), dtype=np.uint16),
        np.zeros(block_size - (len(tok_x)+len(tok_y)), dtype=np.uint16)
    ])
    loss_mask = np.concatenate([
        np.zeros_like(tok_x, dtype=np.uint16),
        np.ones_like(tok_y, dtype=np.uint16),
        np.zeros(block_size - (len(tok_x)+len(tok_y)), dtype=np.uint16)
    ])
    result = np.stack([
        tokens,
        attn_mask,
        loss_mask
    ], axis=0)

    return result
 

In [6]:
result = format_and_tokenize(dataset, enc, 5)
result

array([[   1, 9690,  198, ...,    0,    0,    0],
       [   1,    1,    1, ...,    0,    0,    0],
       [   0,    0,    0, ...,    0,    0,    0]], dtype=uint16)

## Actual Preprocessing Operation

In [8]:
TOTAL_SHARDS = 48
SHARD_SIZE_DOCS = documents // TOTAL_SHARDS
TOKENS_PER_DOC = 3 * BLOCK_SIZE
SHARD_SIZE_TOKENS = SHARD_SIZE_DOCS * TOKENS_PER_DOC
print(f"shard size: {SHARD_SIZE_DOCS:,} documents, {SHARD_SIZE_TOKENS:,} tokens")

shard size: 48,650 documents, 298,905,600 tokens


In [None]:
output_dir = "processed"
os.makedirs(os.path.join(DATASET_DIR, output_dir), exist_ok=True)

def write_shard(shard, shard_idx, split="train"):
    f_path = os.path.join(DATA_CACHE_DIR, output_dir, f"fineweb_edu_{split}_{shard_idx}")
    np.savez(f_path, shard)

f = partial(format_and_tokenize, dataset, enc)

ctx = get_context("fork")
with cf.ProcessPoolExecutor(max_workers = num_cpus, mp_context=ctx) as ex:
    start = time.time()
    
    docs_processed = 0
    shards_written = 0
    tokens_generated = 0
    shard_docs = 0

    shard = np.empty((SHARD_SIZE_DOCS, 3, BLOCK_SIZE), dtype=np.uint16)
    
    for tokens in ex.map(f, range(len(dataset)), chunksize=docs_per_cpu//100):
        if tokens is None:
            continue

        docs_processed += 1
        tokens_generated += len(tokens)

        if docs_processed % 1e3 == 0:
            print(f"processed {docs_processed:,} documents | generated {tokens_generated:,} tokens | wrote {shards_written} shards", end="\r")

        if shard_docs < SHARD_SIZE_DOCS:
            shard[shard_docs, :, :] = tokens 
            shard_docs += 1
        else:
            write_shard(shard, shards_written)
            shards_written += 1
            shard_docs = 0
    
    write_shard(shard, shards_written) #write the final shard
    shards_written += 1
    print(f"processed {docs_processed:,} documents | generated {tokens_generated:,} tokens | wrote {shards_written} shards", end="\r")        
    print(f"finished in {time.time()-start:.2f} seconds")
    assert(docs_processed == total_docs)
    print(f"total shards written: {shards_written:,}")
    print(f"total tokens: {tokens_generated:,}")

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

processed 443,000 documents | generated 1,329,000 tokens | wrote 9 shards