Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ProcessorsScheduler Class #25

Merged
merged 20 commits into from May 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 11 additions & 47 deletions problem.py
Expand Up @@ -10,14 +10,13 @@
nltk.download('punkt')
from utils.BPEEncoder import BPEEncoder
import codecs
from multiprocessing import cpu_count
import os
import pickle as pkl
from utils.common_utils import load_from_pkl, dump_to_pkl

from settings import ProblemTypes
import multiprocessing
import math
from utils.ProcessorsScheduler import ProcessorsScheduler

from core.EnglishTokenizer import EnglishTokenizer
from core.EnglishTextPreprocessor import EnglishTextPreprocessor
Expand Down Expand Up @@ -162,32 +161,15 @@ def build_training_data_list(self, training_data_list, file_columns, input_types
return docs, target_docs, cnt_legal, cnt_illegal

def build_training_multi_processor(self, training_data_list, cpu_num_workers, file_columns, input_types, answer_column_name, bpe_encoder=None):
res = []
process_num = cpu_count()
if cpu_num_workers > 0:
process_num = cpu_num_workers
# logging.info("multiprocess enabled, process num: %d" % (process_num))
process_p = multiprocessing.Pool(process_num)
for i in range(process_num):
size = math.ceil(len(training_data_list)/ process_num)
start = size * i
end = (i + 1) * size if (i + 1) * size < len(training_data_list) else len(training_data_list)
temp_data_list = training_data_list[start:end]
res.append((i, process_p.apply_async(self.build_training_data_list,
args=(temp_data_list, file_columns, input_types, answer_column_name, bpe_encoder)
)
)
)

process_p.close()
process_p.join()
scheduler = ProcessorsScheduler(cpu_num_workers)
func_args = (training_data_list, file_columns, input_types, answer_column_name, bpe_encoder)
res = scheduler.run_data_parallel(self.build_training_data_list, func_args)

docs = dict() # docs of each type of input
target_docs = []
cnt_legal = 0
cnt_illegal = 0
sort_res = sorted(res, key=lambda x:x[0])
for (index, j) in sort_res:
for (index, j) in res:
#logging.info("collect proccesor %d result" % index)
tmp_docs, tmp_target_docs, tmp_cnt_legal, tmp_cnt_illegal = j.get()
if len(docs) == 0:
Expand Down Expand Up @@ -323,37 +305,19 @@ def encode_data_multi_processor(self, data_list, cpu_num_workers, file_columns,
answer_column_name, min_sentence_len, extra_feature, max_lengths=None, fixed_lengths=None, file_format="tsv", bpe_encoder=None):
def judge_dict(obj):
return True if isinstance(obj, dict) else False
res = []

process_num = cpu_count()
if cpu_num_workers > 0:
process_num = cpu_num_workers
#logging.info("multiprocess enabled, process num: %d" % (process_num))
process_p = multiprocessing.Pool(process_num)
for i in range(process_num):
size = math.ceil(len(data_list)/ process_num)
start = size * i
end = (i + 1) * size if (i + 1) * size < len(data_list) else len(data_list)
temp_data_list = data_list[start:end]
res.append((i, process_p.apply_async(self.encode_data_list,
args=((temp_data_list, file_columns, input_types, object_inputs,
answer_column_name, min_sentence_len, extra_feature, max_lengths, fixed_lengths, file_format, bpe_encoder)
)
)
)
)

process_p.close()
process_p.join()

scheduler = ProcessorsScheduler(cpu_num_workers)
func_args = (data_list, file_columns, input_types, object_inputs,
answer_column_name, min_sentence_len, extra_feature, max_lengths, fixed_lengths, file_format, bpe_encoder)
res = scheduler.run_data_parallel(self.encode_data_list, func_args)

data = dict()
lengths = dict()
target = dict()
cnt_legal = 0
cnt_illegal = 0

sort_res = sorted(res, key=lambda x:x[0])
for (index, j) in sort_res:
for (index, j) in res:
# logging.info("collect proccesor %d result"%index)
tmp_data, tmp_lengths, tmp_target, tmp_cnt_legal, tmp_cnt_illegal = j.get()

Expand Down
31 changes: 31 additions & 0 deletions utils/ProcessorsScheduler.py
@@ -0,0 +1,31 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT license.

import multiprocessing
from multiprocessing import cpu_count
import math

class ProcessorsScheduler(object):
process_num = cpu_count()

def __init__(self, cpu_num_workers=None):
if cpu_num_workers != None and cpu_num_workers > 0:
self.process_num = cpu_num_workers

def run_data_parallel(self, func, func_args):
data, rest_args = func_args[0], func_args[1:]
res = []
# logging.info("multiprocess enabled, process num: %d" % (self.process_num))
process_p = multiprocessing.Pool(self.process_num)
data_length = len(data)
size = math.ceil(data_length/ self.process_num)

for i in range(self.process_num):
start = size * i
end = (i + 1) * size if (i + 1) * size < data_length else data_length
args = (data[start:end], ) + rest_args
res.append((i, process_p.apply_async(func, args=args)))
process_p.close()
process_p.join()
res = sorted(res, key=lambda x:x[0])
return res