/
convert_data.py
91 lines (78 loc) · 3.6 KB
/
convert_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import math
import lzma
import neetbox
from neetbox import logger
from neetbox.utils import ResourceLoader
from neetbox.extension.machine import hardware
from concurrent.futures import ThreadPoolExecutor, wait as wait_futures
vocabulary_set = set() # vocabulary set
def xzs2txt(xz_file_list, txt_file_name):
global vocabulary_set
with open(txt_file_name, "w", encoding="utf-8") as outfile:
for file_path in neetbox.progress(xz_file_list, name=txt_file_name):
with lzma.open(file_path, "rt", encoding="utf-8") as current_xz:
text = current_xz.read()
outfile.write(text)
characters = set(text)
vocabulary_set.update(characters)
def multi_thread_convert(xz_files, out_file_pattern):
num_xz_files = len(xz_files)
max_files_in_memory = int(
hardware.memory.available * 0.8
) # assert each xz file will take 1MB to be loaded in to ram, and we would use 80% of availiable memory to convert xz files
num_workers = math.ceil(num_xz_files / max_files_in_memory)
num_workers = 1 if num_workers == 0 else num_workers
num_workers = max(num_workers, len(hardware.cpus))
num_files_in_memory = num_xz_files // num_workers
logger.log(
f"max files in memory = {max_files_in_memory}, available memory {hardware.memory.available}MB"
)
logger.log(f"Using {num_workers} workers to convert")
futures = []
with ThreadPoolExecutor(num_workers) as executor:
for i in range(num_workers): # using num_workers workers
start_index = i * num_files_in_memory
end_index = (
num_xz_files
if (i + 1) * num_files_in_memory > num_xz_files
else (i + 1) * num_files_in_memory
)
future = executor.submit(
xzs2txt, xz_files[start_index:end_index], out_file_pattern.format(i)
) # submit convert task to excutor
futures.append(future)
logger.log(
f"submitted convert task for files in range [{start_index}, {end_index})"
)
logger.info("waiting for converters...")
wait_futures(futures)
for future in futures:
try:
logger.info(f"returned from converter. {future.result()}")
except Exception as e:
logger.err(RuntimeError(f"converter encountered {e}, check file status."))
if __name__ == "__main__":
xz_file_folder = "./openWebTextCorpus"
output_file_train = "./data/output{}.txt"
output_file_test = "./data/test{}.txt"
vocabulary_file = "./data/vocab.voc"
xz_files = ResourceLoader(
folder=xz_file_folder, file_types=["xz"], sub_dirs=False
).get_file_list()
num_xz_files = len(xz_files)
test_ratio = 0.1 # k fold ratio
num_xz_files_test = max(int(num_xz_files * test_ratio), 1)
test_xz_files = xz_files[-num_xz_files_test:] # xz files for test
train_xz_files = xz_files[:-num_xz_files_test] # xz files for train
# do convert
logger.info("-> convert train set")
multi_thread_convert(train_xz_files, output_file_train)
logger.info("-> convert test set")
multi_thread_convert(test_xz_files, output_file_test)
# write vocabulary set into file
logger.log(f"saving vocabulary set into {vocabulary_file}...")
with open(vocabulary_file, "w", encoding="utf-8") as voc_file:
for char in vocabulary_set:
voc_file.write(char + "\n")
logger.log("All tasks completed.")
# open localhost:20202 in browser to see the progress