Skip to content

Commit

Permalink
Moved parallel code outside of PyLinter
Browse files Browse the repository at this point in the history
  • Loading branch information
AWhetter committed Jul 2, 2016
1 parent 5c7f259 commit 50d3d91
Showing 1 changed file with 117 additions and 104 deletions.
221 changes: 117 additions & 104 deletions pylint/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def _patch_sysmodules():


if multiprocessing is not None:
class ChildLinter(multiprocessing.Process):
class ChildRunner(multiprocessing.Process):
def run(self):
# pylint: disable=no-member, unbalanced-tuple-unpacking
tasks_queue, results_queue, self._config = self._args
Expand Down Expand Up @@ -712,115 +712,23 @@ def should_analyze_file(self, modname, path): # pylint: disable=unused-argument,
"""
return path.endswith('.py')

def _init_msg_states(self):
for msg in self.msgs_store.messages:
if not msg.may_be_emitted():
self._msgs_state[msg.msgid] = False

def check(self, files_or_modules):
"""main checking entry: check a list of files or modules from their
name.
"""
# initialize msgs_state now that all messages have been registered into
# the store
for msg in self.msgs_store.messages:
if not msg.may_be_emitted():
self._msgs_state[msg.msgid] = False
self._init_msg_states()

if not isinstance(files_or_modules, (list, tuple)):
files_or_modules = (files_or_modules,)

if self.config.jobs == 1:
self._do_check(files_or_modules)
else:
with _patch_sysmodules():
self._parallel_check(files_or_modules)

def _get_jobs_config(self):
child_config = collections.OrderedDict()
filter_options = {'long-help'}
filter_options.update((opt_name for opt_name, _ in self._external_opts))
for opt_providers in six.itervalues(self._all_options):
for optname, optdict, val in opt_providers.options_and_values():
if optdict.get('deprecated'):
continue

if optname not in filter_options:
child_config[optname] = utils._format_option_value(
optdict, val)
child_config['python3_porting_mode'] = self._python3_porting_mode
child_config['plugins'] = self._dynamic_plugins
return child_config

def _parallel_task(self, files_or_modules):
# Prepare configuration for child linters.
child_config = self._get_jobs_config()

children = []
manager = multiprocessing.Manager()
tasks_queue = manager.Queue()
results_queue = manager.Queue()

for _ in range(self.config.jobs):
child_linter = ChildLinter(args=(tasks_queue, results_queue,
child_config))
child_linter.start()
children.append(child_linter)

# Send files to child linters.
expanded_files = utils.expand_files(files_or_modules, self, self.config.black_list, self.config.black_list_re)
for module_desc in expanded_files:
tasks_queue.put([module_desc.path])

# collect results from child linters
failed = False
for _ in expanded_files:
try:
result = results_queue.get()
except Exception as ex:
print("internal error while receiving results from child linter",
file=sys.stderr)
print(ex, file=sys.stderr)
failed = True
break
yield result

# Stop child linters and wait for their completion.
for _ in range(self.config.jobs):
tasks_queue.put('STOP')
for child in children:
child.join()

if failed:
print("Error occured, stopping the linter.", file=sys.stderr)
sys.exit(32)

def _parallel_check(self, files_or_modules):
# Reset stats.
self.open()

all_stats = []
module = None
for result in self._parallel_task(files_or_modules):
(
_,
self.file_state.base_name,
module,
messages,
stats,
msg_status
) = result

for msg in messages:
msg = utils.Message(*msg)
self.set_current_module(module)
self.reporter.handle_message(msg)

all_stats.append(stats)
self.msg_status |= msg_status

self.stats = _merge_stats(all_stats)
self.current_name = module

# Insert stats data to local checkers.
for checker in self.get_checkers():
if checker is not self:
checker.stats = self.stats
self._do_check(files_or_modules)

def _do_check(self, files_or_modules):
walker = utils.PyLintASTWalker(self)
Expand Down Expand Up @@ -1291,18 +1199,123 @@ def __init__(self, args, reporter=None, exit=True):
print("Multiprocessing library is missing, "
"fallback to single process", file=sys.stderr)
linter.set_option("jobs", 1)
else:
if linter.config.jobs == 0:
linter.config.jobs = multiprocessing.cpu_count()
elif linter.config.jobs == 0:
linter.config.jobs = multiprocessing.cpu_count()

# insert current working directory to the python path to have a correct
# behaviour
with fix_import_path(args):
linter.check(args)
if linter.config.jobs == 1:
linter.check(args)
else:
self._parallel_run(args)

linter.generate_reports()

if exit:
sys.exit(self.linter.msg_status)

def _parallel_run(self, files_or_modules):
with _patch_sysmodules():
self.linter._init_msg_states()
self._parallel_check(files_or_modules)

def _parallel_task(self, files_or_modules):
# Prepare configuration for child linters.
child_config = self._get_jobs_config()

children = []
manager = multiprocessing.Manager()
tasks_queue = manager.Queue()
results_queue = manager.Queue()

for _ in range(self.linter.config.jobs):
child_linter = ChildRunner(args=(tasks_queue, results_queue,
child_config))
child_linter.start()
children.append(child_linter)

# Send files to child linters.
expanded_files = utils.expand_files(
files_or_modules,
self.linter,
self.linter.config.black_list,
self.linter.config.black_list_re
)
for module_desc in expanded_files:
tasks_queue.put([module_desc.path])

# collect results from child linters
failed = False
for _ in expanded_files:
try:
result = results_queue.get()
except Exception as ex:
print("internal error while receiving results from child linter",
file=sys.stderr)
print(ex, file=sys.stderr)
failed = True
break
yield result

# Stop child linters and wait for their completion.
for _ in range(self.linter.config.jobs):
tasks_queue.put('STOP')
for child in children:
child.join()

if failed:
print("Error occured, stopping the linter.", file=sys.stderr)
sys.exit(32)

def _parallel_check(self, files_or_modules):
# Reset stats.
self.linter.open()

all_stats = []
module = None
for result in self._parallel_task(files_or_modules):
(
_,
self.linter.file_state.base_name,
module,
messages,
stats,
msg_status
) = result

for msg in messages:
msg = utils.Message(*msg)
self.linter.set_current_module(module)
self.linter.reporter.handle_message(msg)

all_stats.append(stats)
self.linter.msg_status |= msg_status

self.linter.stats = _merge_stats(all_stats)
self.linter.current_name = module

# Insert stats data to local checkers.
for checker in self.linter.get_checkers():
if checker is not self.linter:
checker.stats = self.linter.stats

def _get_jobs_config(self):
child_config = collections.OrderedDict()
filter_options = {'long-help'}
filter_options.update((opt_name for opt_name, _ in self.linter._external_opts))
for opt_providers in six.itervalues(self.linter._all_options):
for optname, optdict, val in opt_providers.options_and_values():
if optdict.get('deprecated'):
continue

if optname not in filter_options:
child_config[optname] = utils._format_option_value(
optdict, val)
child_config['python3_porting_mode'] = self.linter._python3_porting_mode
child_config['plugins'] = self.linter._dynamic_plugins
return child_config

def cb_set_rcfile(self, name, value):
"""callback for option preprocessing (i.e. before option parsing)"""
self._rcfile = value
Expand Down

0 comments on commit 50d3d91

Please sign in to comment.