In [1]:
import os
project_root = os.path.dirname(os.getcwd())
import sys
# Add the directory to the Python path
sys.path.append(f"{project_root}/src")

In [2]:
from pathlib import Path

import ray
from datasets import load_from_disk,load_dataset

from config import gpt2_cfg as cfg 
from datasource_processor import DatasourceProcessor
from text_split_processor import TextSplitProcessor
from chunk_processor import ChunkProcessor
from token_processor import TokenProcessor


In [3]:
source_path = Path(cfg["dataset"]["source"][0]["path"]+"/shards")
file_paths = list(source_path.glob("*.parquet"))[:20]

In [4]:
file_paths

[PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_74.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_101.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_18.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_136.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_161.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_100.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_22.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_75.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_61.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_63.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_181.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_5.parquet'),
 PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_90.parque

In [5]:
file_path_ds = ray.data.from_items(file_paths)

2024-09-02 05:55:47,326	INFO worker.py:1598 -- Connecting to existing Ray cluster at address: 192.168.2.113:6379...
2024-09-02 05:55:47,330	INFO worker.py:1774 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


In [6]:
file_path_ds.take(1)

2024-09-02 05:55:52,030	INFO dataset.py:2409 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-09-02 05:55:52,032	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-09-02_05-52-44_887309_12138/logs/ray-data
2024-09-02 05:55:52,032	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> LimitOperator[limit=1]


Running 0: 0.00 row [00:00, ? row/s]

- limit=1 1: 0.00 row [00:00, ? row/s]

[{'item': PosixPath('/workspaces/CaiZi/dataset/openwebtext/shards/shard_74.parquet')}]

In [7]:
datasource_processor = DatasourceProcessor(source_format="parquet")
texts_ds = file_path_ds.flat_map(datasource_processor)

In [8]:
texts_ds.schema()

2024-09-02 05:55:57,809	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-09-02_05-52-44_887309_12138/logs/ray-data
2024-09-02 05:55:57,809	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(DatasourceProcessor)] -> LimitOperator[limit=1]


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(DatasourceProcessor) 1: 0.00 row [00:00, ? row/s]

- limit=1 2: 0.00 row [00:00, ? row/s]

In [None]:
train_ratio = cfg["ray_data"]["train_ratio"]
text_split_processor = TextSplitProcessor(train_ratio=train_ratio)
texts_split_ds = texts_ds.map(text_split_processor)


In [None]:

tokenizer_class = TokenProcessor.create(cfg['ray_data']['tokenizer_class']['name'])
tokenizer_args =  cfg['ray_data']['tokenizer_class']['args']
tokenizer= tokenizer_class(**tokenizer_args)
tokens_ds = texts_split_ds.map(tokenizer)

In [None]:
tokens_ds.take(1)

In [None]:
block_size = cfg["model"]["block_size"]
stride = cfg["model"]["stride"]
chunk_processor = ChunkProcessor(block_size=block_size, stride=stride)
chunked_tokens_ds = tokens_ds.map(chunk_processor)

In [None]:
chunked_tokens_ds.write_parquet(cfg["dataset"]["chunked_tokens"])

# train_chunked_tokens = []
# validate_chunked_tokens = []

# for item in chunked_tokens.iter_rows():
#     train_chunked_tokens.extend(item["train"])
#     validate_chunked_tokens.extend(item["validate"])

#self.train_chunked_tokens = ray.data.from_items(train_chunked_tokens)
#self.validate_chunked_tokens = ray.data.from_items(validate_chunked_tokens)
