In [1]:
import os
import sys
import glob
import tarfile
import tqdm
import ray
import json
import shutil

In [2]:
ray.init(num_cpus=64)

2020-02-20 20:41:13,398	INFO resource_spec.py:216 -- Starting Ray with 34.18 GiB memory available for workers and up to 17.09 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


{'node_ip_address': '169.237.10.101',
 'redis_address': '169.237.10.101:59498',
 'object_store_address': '/tmp/ray/session_2020-02-20_20-41-13_396332_8589/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-02-20_20-41-13_396332_8589/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2020-02-20_20-41-13_396332_8589'}

In [3]:
@ray.remote
def extract(filename, output_path):
    tar = tarfile.open(filename)
    tar.extractall(path=output_path)
    tar.close()
    return 0

In [4]:
DATA_PATH = "../download/scraped/"
OUTPUT_PATH = "../temp/openwebtext_raw"

In [None]:
os.makedirs(OUTPUT_PATH, exist_ok=True)

In [5]:
files = glob.glob(os.path.join(DATA_PATH, "*.xz"))

In [6]:
objects = []

for count, file in enumerate(tqdm.tqdm(files)):
    filename = file
    output_path = os.path.join(OUTPUT_PATH, str(count))
    
    obj_id = extract.remote(filename, output_path)
    objects.append(obj_id)
    
for i in tqdm.trange(len(files)):
    assert ray.get(objects[i]) == 0

100%|██████████| 213/213 [00:00<00:00, 3679.29it/s]
100%|██████████| 213/213 [02:24<00:00,  1.47it/s]


In [7]:
def extract_url(name):
    url = name.split("/")[-1][:-4].split("-")[1]
    return url

In [8]:
processed_folders = glob.glob(os.path.join(OUTPUT_PATH, "*"))

In [9]:
@ray.remote
def write_to_json(txt_files, output_filename, rank):
    f_write = open(output_filename, "w")
    
    if rank == 0:
        txt_files = tqdm.tqdm(txt_files)
    
    for txt_file in txt_files:
        # get url hash
        url = extract_url(txt_file)
        with open(txt_file, "r") as f_read:
            line = {
                "text": f_read.read(),
                "url": url
            }
            line = json.dumps(line)
            f_write.write(line)
            f_write.write("\n")

    f_write.close()
    return 0

In [11]:
os.makedirs("../temp/openwebtext_json", exist_ok=True)

In [12]:
ray_objs = []

for i, folder in enumerate(processed_folders):
    txt_files = glob.glob(os.path.join(folder, "*"))
    obj = write_to_json.remote(txt_files, f"../temp/openwebtext_json/{i}.jsonl", rank=i)
    ray_objs.append(obj)
    
for i, folder in enumerate(processed_folders):
    ray.get(ray_objs[i])

  0%|          | 0/49406 [00:00<?, ?it/s]
  4%|▍         | 1895/49406 [00:00<00:02, 18948.12it/s]
  8%|▊         | 3772/49406 [00:00<00:02, 18891.21it/s]
 11%|█         | 5475/49406 [00:00<00:02, 18290.32it/s]
 15%|█▍        | 7317/49406 [00:00<00:02, 18328.06it/s]
 19%|█▊        | 9202/49406 [00:00<00:02, 18479.58it/s]
 23%|██▎       | 11117/49406 [00:00<00:02, 18675.59it/s]
 26%|██▋       | 13017/49406 [00:00<00:01, 18769.58it/s]
 30%|███       | 14977/49406 [00:00<00:01, 19009.77it/s]
 34%|███▍      | 16771/49406 [00:00<00:01, 18408.52it/s]
 38%|███▊      | 18554/49406 [00:01<00:01, 18226.56it/s]
 41%|████      | 20328/49406 [00:01<00:01, 17807.91it/s]
 45%|████▍     | 22077/49406 [00:01<00:01, 17521.76it/s]
 48%|████▊     | 23808/49406 [00:01<00:01, 17380.74it/s]
 52%|█████▏    | 25564/49406 [00:01<00:01, 17433.38it/s]
 55%|█████▌    | 27307/49406 [00:01<00:01, 17428.83it/s]
 59%|█████▉    | 29043/49406 [00:01<00:01, 16683.07it/s]
 62%|██████▏   | 30731/49406 [00:01<00:01, 16739.17

In [41]:
# f_write = open("train_data.jsonl", "w")

# for file in tqdm.tqdm(glob.glob("openwebtext_json/*")):
#     with open(file, "r") as f_read:
#         for line in f_read:
#             f_write.write(line)
        
# f_write.close()

100%|██████████| 213/213 [01:17<00:00,  2.75it/s]


## Perform data cleaning

In [None]:
os.makedirs("../data/cleaned", exist_ok=True)

In [None]:
TOTAL_NUM_CHUNKS = 213

@ray.remote
def call_command(i):
    command = f"python cleanup_dataset.py ../temp/openwebtext_json/{i}.jsonl ../data/cleaned/{i}.jsonl"
    subprocess.run(command, shell=True)
    return 0

ray_objs = []

for i in range(TOTAL_NUM_CHUNKS):
    ray_objs.append(call_command.remote(i))

for i in tqdm.trange(TOTAL_NUM_CHUNKS):
    ray.get(ray_objs[i])

In [43]:
# remove folders
# shutil.rmtree("openwebtext_json/")
# shutil.rmtree(OUTPUT_PATH)

In [None]:
f_write = open("data/cleaned_data.jsonl", "w")

for file in tqdm.tqdm(glob.glob("data/cleaned/*")):
    with open(file, "r") as f_read:
        for line in f_read:
            f_write.write(line)
        
f_write.close()