In [1]:
import torch
import time
import pandas as pd

In [2]:
import json
from collections import OrderedDict
from utils.Vocabulary import Vocabulary

min_freq = 75
eng_vocab_file = "../data-sets/vocab_eng.json"
nld_vocab_file = "../data-sets/vocab_nld.json"

with open(eng_vocab_file, mode="r") as f:
    eng_dict = json.load(f, object_pairs_hook=OrderedDict)

with open(nld_vocab_file, mode="r") as f:
    nld_dict = json.load(f, object_pairs_hook=OrderedDict)

eng_vocab = Vocabulary(eng_dict, min_freq=min_freq)
nld_vocab = Vocabulary(nld_dict, min_freq=min_freq)

In [3]:
file_path = "../data-sets/filtered_and_tokenized_sentences(1).csv"

In [34]:
import concurrent

max_workers=4
chunksize=10000
columns = ["eng_tokens", "nld_tokens"]
dtype = { "eng_tokens": "str", "nld_tokens": "str" }

def init(input_vocabulary, target_vocabulary):
    global input_vocab, target_vocab
    input_vocab = input_vocabulary
    target_vocab = target_vocabulary

def process_chunk(df):
    # return [(torch.zeros(20, dtype=torch.int), torch.zeros(20, dtype=torch.int))]

    result = []

    for _, row in df.iterrows():
        result.append((torch.zeros(20, dtype=torch.int), torch.zeros(20, dtype=torch.int)))

        # eng_idx = [input_vocab.token_to_index_func(token) for token in row["eng_tokens"].split()]
        # nld_idx = [target_vocab.token_to_index_func(token) for token in row["nld_tokens"].split()]

        # eng_idx = pad_or_truncate(eng_idx, input_vocab.token_to_index_func("<PAD>"))
        # nld_idx = pad_or_truncate(nld_idx, target_vocab.token_to_index_func("<PAD>"))

        # result.append((torch.tensor(eng_idx, dtype=torch.int), torch.tensor(nld_idx, dtype=torch.int)))

    return result

def pad_or_truncate(tokens, pad_idx):
    output_size = 20

    if (len(tokens) == output_size):
        return tokens

    if len(tokens) > output_size:
        return tokens[:output_size]

    return tokens + [pad_idx] * (output_size - len(tokens))

t0 = time.time()

df = pd.read_csv(
    file_path,
    encoding="utf-8",
    header=None,
    names=columns,
    dtype=dtype,
    nrows=1000000
)

chunks = [df[i:i+chunksize] for i in range(0, df.shape[0], chunksize)]

results = []
with concurrent.futures.ProcessPoolExecutor(
    max_workers=max_workers,
    initializer=init,
    initargs=(eng_vocab, nld_vocab)
) as executor:
    futures = [executor.submit(process_chunk, chunk) for chunk in chunks]

    for future in concurrent.futures.as_completed(futures):
        try:
            result = future.result()
            results.append(result)
        except Exception as e:
            print(f"Exception occurred: {e}")
        finally:
            del result  # Release shared memory objects
            del future  # Release the future object

t1 = time.time()
print("Total time:", t1-t0)
print()
print('Size:', len(results))
print()
print(results)

Exception occurred: unable to open shared memory object </torch_19855_1496070290_12549> in read-write mode: Too many open files in system (23)


NameError: name 'result' is not defined

In [19]:
data = []
output_size = 20

max_workers=None
chunksize=100000
columns = ["eng_tokens", "nld_tokens"]
dtype = { "eng_tokens": "str", "nld_tokens": "str" }

t0 = time.time()

with pd.read_csv(
        file_path,
        encoding="utf-8",
        chunksize=chunksize,
        header=None,
        names=columns,
        dtype=dtype,
        nrows=1000000
    ) as reader:
    for chunk in reader:
        for _, row in chunk.iterrows():
            eng_idx = [eng_vocab.token_to_index_func(token) for token in row["eng_tokens"].split()]
            nld_idx = [nld_vocab.token_to_index_func(token) for token in row["nld_tokens"].split()]

            eng_idx = pad_or_truncate(eng_idx, eng_vocab.token_to_index_func("<PAD>"))
            nld_idx = pad_or_truncate(nld_idx, nld_vocab.token_to_index_func("<PAD>"))

            data.append((torch.tensor(eng_idx, dtype=torch.int), torch.tensor(nld_idx, dtype=torch.int)))

t1 = time.time()
print("Total time:", t1-t0)

Total time: 37.085352182388306
