# Preprocess the SmolLM Dataset


## Load the Dataset


In [1]:
import os


local_dir =  "train-gpt2-data" 
DATA_CACHE_DIR = os.path.join("/lambda/nfs", local_dir)
os.makedirs(DATA_CACHE_DIR, exist_ok=True)



In [None]:
import boto3
import gzip
from datasets import load_dataset
from botocore.exceptions import ClientError

session = boto3.Session(
    aws_access_key_id="",
    aws_secret_access_key="")
s3 = session.client("s3")
num_proc = 80
bucket_name = "softwareheritage"

def download_contents(blob_id):
    key = f"content/{blob_id}"
    try:
        obj = s3.get_object(Bucket=bucket_name, Key=key)
        with gzip.GzipFile(fileobj=obj['Body']) as fin:
            content = fin.read().decode("utf-8", errors="ignore")
        return {"text": content, "download_success": True}
    except ClientError as e:
        if e.response['Error']['Code'] == 'NoSuchKey':
            print(f"File not found: {key}")
            return {"text": "", "download_success": False}
        else:
            raise

ds = load_dataset("HuggingFaceTB/smollm-corpus", "python-edu", split="train", num_proc=num_proc, cache_dir=DATA_CACHE_DIR)
ds = ds.map(download_contents, input_columns="blob_id", num_proc=num_proc, batched=False)

# Filter out failed downloads
ds = ds.filter(lambda x: x['download_success'])

# Optionally, print the first example to verify the data
print(ds[0])
ds.save_to_disk('/lambda/nfs/train-gpt2-data/smollm-corpus/python-edu')

In [3]:
from datasets import load_from_disk
from datasets import load_dataset

dataset_objs = []

ds = load_dataset("HuggingFaceTB/smollm-corpus", "fineweb-edu-dedup", split="train", cache_dir=DATA_CACHE_DIR)
dataset_objs.append(ds)

ds = load_from_disk('/lambda/nfs/train-gpt2-data/smollm-corpus/python-edu')
dataset_objs.append(ds)

ds = load_dataset("HuggingFaceTB/smollm-corpus", "cosmopedia-v2", split="train", cache_dir=DATA_CACHE_DIR)
dataset_objs.append(ds)

Resolving data files:   0%|          | 0/104 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/234 [00:00<?, ?it/s]

Loading dataset shards:   0%|          | 0/1906 [00:00<?, ?it/s]

Loading dataset from disk:   0%|          | 0/27 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/104 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/104 [00:00<?, ?it/s]

Loading dataset shards:   0%|          | 0/423 [00:00<?, ?it/s]

In [None]:
from datasets import interleave_datasets

# cosmopedia-v2, python-edu, fineweb-edu-dedup
#probabilities=[0.111, 0.016 , 0.873]


#ds = interleave_datasets(
#    dataset_objs, probabilities=probabilities, seed=1337)


## Setup


In [4]:
import time
import math
from functools import partial
import concurrent.futures as cf

import numpy as np
from transformers import AutoTokenizer

#import tiktoken
#enc = tiktoken.get_encoding("gpt2")
tokenizer = AutoTokenizer.from_pretrained("HuggingFaceTB/SmolLM-360M")

num_cpus = os.cpu_count()
print(f"""
sytem statistics:
-----------------
cpu count: {num_cpus}""")
docs = [len(dataset) for dataset in dataset_objs]
total_docs = sum(docs)
docs_per_cpu = int(math.ceil(total_docs/num_cpus))
print(f"""
dataset statistics
------------------
docs_per_dataset: {" | ".join(f'{x:,}' for x in docs)}
documents: {total_docs:,}
docs_per_cpu: {docs_per_cpu:,}""")



sytem statistics:
-----------------
cpu count: 120

dataset statistics
------------------
docs_per_dataset: 190,168,005 | 7,678,447 | 39,134,000
documents: 236,980,452
docs_per_cpu: 1,974,838


## Token Statistics

In [None]:
def count_tokens(dataset, _tokenizer, idx):
    tokens = _tokenizer.encode(dataset[idx]['text'])
    return len(tokens)


with cf.ProcessPoolExecutor(max_workers = num_cpus) as ex:
    start = time.time()
    documents = 0
    tokens = 0

    for dataset in dataset_objs: 
        f = partial(count_tokens, dataset, tokenizer)
        for result in ex.map(f, range(len(dataset)), chunksize=docs_per_cpu//10):
            documents += 1
            tokens += result
            elapsed = time.time() - start
            documents % 1e3 == 0 and (
                print(f"processed {documents:,} | docs/s {documents/elapsed:0.4f}", end="\r")
            )
            
    print(f"processed documents in {time.time()-start:0.2f} seconds")
    print(f"total tokens: {tokens:,}")
    print(f"total documents: {documents:,}")   
    assert(documents == total_docs)

processed documents in 22565.92 seconds

total tokens: 227,557,089,774

total documents: 236,980,452

## Actual Preprocessing Operation

In [5]:
EOT_TOKEN = 0
SHARD_SIZE = int(1e8)
output_dir = "processed"
os.makedirs(os.path.join(DATA_CACHE_DIR, output_dir), exist_ok=True)

def write_shard(d_idx, shard, shard_idx):
    if shard_idx % 100 == 0:
        split = "valid"
    else:
        split = "train"
    f_path = os.path.join(DATA_CACHE_DIR, output_dir, f"smol_lm_corpus_{d_idx}_{split}_{shard_idx}")
    np.savez(f_path, shard)


def tokenize(dataset, encoder, idx):
    tokens = [EOT_TOKEN] + encoder.encode(dataset[idx]['text'])
    return tokens


#for d_idx, dataset in enumerate(dataset_objs[2:]):
d_idx = 2
dataset = dataset_objs[2]
f = partial(tokenize, dataset, tokenizer)

with cf.ProcessPoolExecutor(max_workers = num_cpus) as ex:
    start = time.time()
    
    docs_processed = 0
    shards_written = 0
    tokens_generated = 0
    shard_token_count = 0

    shard = np.empty((SHARD_SIZE,), dtype=np.uint16)
    
    for tokens in ex.map(f, range(len(dataset)), chunksize=docs_per_cpu//200):
        docs_processed += 1
        tokens_generated += len(tokens)

        if docs_processed % 1e4 == 0:
            print(f"processed {docs_processed:,} documents | generated {tokens_generated:,} tokens | wrote {shards_written} shards", end="\r")

        if shard_token_count + len(tokens) < SHARD_SIZE:
            shard[shard_token_count:shard_token_count + len(tokens)] = tokens 
            shard_token_count += len(tokens)
        else:
            remainder = SHARD_SIZE - shard_token_count
            shard[shard_token_count:shard_token_count + remainder] = tokens[:remainder]
            write_shard(d_idx, shard, shards_written)
            shards_written += 1
            
            shard[:len(tokens) - remainder] = tokens[remainder:]
            shard_token_count = len(tokens) - remainder
    
    write_shard(d_idx, shard, shards_written) #write the final shard
    shards_written += 1
    print(f"processed {docs_processed:,} documents | generated {tokens_generated:,} tokens | wrote {shards_written} shards", end="\r")        
    print(f"finished in {time.time()-start:.2f} seconds")
    assert(docs_processed == len(dataset))
    print(f"total shards written: {shards_written:,}")
    print(f"total tokens: {tokens_generated:,}")

finished in 4378.44 secondsnts | generated 28,167,242,374 tokens | wrote 282 shards
total shards written: 282
total tokens: 28,167,242,374
