In [1]:
import mp
import os
import pyarrow as pa
import numpy as np
from streaming import MDSWriter
from tqdm import tqdm

In [2]:
from streaming.base.format.mds.encodings import Encoding, _encodings

class Int32(Encoding):
    def encode(self, obj) -> bytes:
        return obj.tobytes()

    def decode(self, data: bytes):
        return np.frombuffer(data, np.int32)

_encodings['int32'] = Int32

In [11]:
columns = {
    'input_ids': 'int32',
}
compression = 'zstd'
hashes = 'sha1', 'xxh64'

In [4]:
from glob import glob

files = glob('combine-lm_*_of_00030.jsonl-tokenized')
files

['combine-lm_00021_of_00030.jsonl-tokenized',
 'combine-lm_00025_of_00030.jsonl-tokenized',
 'combine-lm_00008_of_00030.jsonl-tokenized',
 'combine-lm_00026_of_00030.jsonl-tokenized',
 'combine-lm_00002_of_00030.jsonl-tokenized',
 'combine-lm_00012_of_00030.jsonl-tokenized',
 'combine-lm_00024_of_00030.jsonl-tokenized',
 'combine-lm_00011_of_00030.jsonl-tokenized',
 'combine-lm_00010_of_00030.jsonl-tokenized',
 'combine-lm_00003_of_00030.jsonl-tokenized',
 'combine-lm_00009_of_00030.jsonl-tokenized',
 'combine-lm_00000_of_00030.jsonl-tokenized',
 'combine-lm_00023_of_00030.jsonl-tokenized',
 'combine-lm_00022_of_00030.jsonl-tokenized',
 'combine-lm_00027_of_00030.jsonl-tokenized',
 'combine-lm_00004_of_00030.jsonl-tokenized',
 'combine-lm_00001_of_00030.jsonl-tokenized',
 'combine-lm_00020_of_00030.jsonl-tokenized',
 'combine-lm_00006_of_00030.jsonl-tokenized',
 'combine-lm_00014_of_00030.jsonl-tokenized',
 'combine-lm_00018_of_00030.jsonl-tokenized',
 'combine-lm_00015_of_00030.jsonl-

In [12]:
def loop(files):
    files, index = files
    out_root = f'nanot5-{index}'
    os.system(f'rm -rf {out_root}')
    with MDSWriter(out=out_root, columns=columns, compression=compression, hashes=hashes, 
                   size_limit = 67108864 * 2) as out:
        for f in files:
            memory_mapped_stream = pa.memory_map(f)
            opened_stream = pa.ipc.open_stream(memory_mapped_stream)
            for a in tqdm(opened_stream):
                s = a.to_struct_array()
                for i in range(len(s)):
                    keys = list(s[i])
                    a_ = {}
                    for k in keys:
                        a_[k] = np.array(s[i][k].as_py()).astype(np.int32)
                    out.write(a_)

In [13]:
mp.multiprocessing(files, loop, cores = 30, returned = False)

4976it [00:48, 101.67it/s]
4976it [00:49, 101.38it/s]
4976it [01:01, 81.39it/s]
4976it [01:02, 80.11it/s]
4976it [01:02, 79.96it/s]
4976it [01:02, 79.71it/s]
4976it [01:02, 79.28it/s]
4976it [01:03, 78.18it/s]
4976it [01:04, 77.67it/s]
4976it [01:20, 61.91it/s]
4976it [02:13, 37.35it/s]
4976it [02:14, 36.92it/s]
4976it [02:15, 36.62it/s]
4976it [02:22, 34.97it/s]
4976it [02:23, 34.73it/s]
4976it [02:23, 34.57it/s]
4976it [02:41, 30.78it/s] 
4976it [02:44, 30.27it/s]
4976it [02:44, 30.23it/s]
4379it [02:44, 21.65it/s]IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

4976it [04:49, 17.20it/s] 
4976it [05:31, 15.00it/s]
5194it [06:38, 13.03it/s]
4976it [11:48,  7.03it/s] 
4977it [12:24,  6.68it/s]
5025it [14:02,  5.97i