Skip to content

Commit

Permalink
Refactor parallel linting
Browse files Browse the repository at this point in the history
The previous implementation created new PyLinter objects in the worker
(child) process causing failure when running under Prospector because
Prospector uses a custom PyLinter class (a class inherited from PyLinter)
and PyLint naturally just creates PyLinter object. This caused linting to
fail because there is options for Prospector's IndentChecker which was not
created in the worker process.

The new implementation passes the original PyLinter object into workers
when the workers are created. See https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

Note that as Windows uses spawn method by default, PyLinter object (and
its) members need to be pickleable from now on with the exception being
PyLinter.reporter which is not passed to child processes.

The performance has remained about the same based on quick tests done with
Django project containing about 30 000 lines of code; with the old
implementation linting took 26-28 seconds with 8 jobs on quad core i7 and
24-27 seconds with the new implementation.
  • Loading branch information
janneronkko committed Jul 23, 2019
1 parent ad16357 commit 141888f
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 181 deletions.
4 changes: 2 additions & 2 deletions ChangeLog
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ Release date: TBA

* Refactor file checking

Remove code duplication and prepare for supporting parallel linting
under Prospector.
Remove code duplication and allow parallel linting using custom PyLinter
classes.

* Added a new check, ``invalid-overridden-method``

Expand Down
6 changes: 5 additions & 1 deletion pylint/checkers/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,11 +640,15 @@ def _has_same_layout_slots(slots, assigned_value):
}


def _scope_default():
return collections.defaultdict(list)


class ScopeAccessMap:
"""Store the accessed variables per scope."""

def __init__(self):
self._scopes = collections.defaultdict(lambda: collections.defaultdict(list))
self._scopes = collections.defaultdict(_scope_default)

def set_accessed(self, node):
"""Set the given node as accessed."""
Expand Down
11 changes: 7 additions & 4 deletions pylint/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import configparser
import contextlib
import copy
import functools
import io
import optparse
import os
Expand Down Expand Up @@ -693,10 +694,8 @@ def read_config_file(self, config_file=None, verbose=None):
opt = "-".join(["long"] * helplevel) + "-help"
if opt in self._all_options:
break # already processed
# pylint: disable=unused-argument
def helpfunc(option, opt, val, p, level=helplevel):
print(self.help(level))
sys.exit(0)

helpfunc = functools.partial(self.helpfunc, level=helplevel)

helpmsg = "%s verbose help." % " ".join(["more"] * helplevel)
optdict = {"action": "callback", "callback": helpfunc, "help": helpmsg}
Expand Down Expand Up @@ -790,6 +789,10 @@ def help(self, level=0):
with _patch_optparse():
return self.cmdline_parser.format_help()

def helpfunc(self, option, opt, val, p, level): # pylint: disable=unused-argument
print(self.help(level))
sys.exit(0)


class OptionsProviderMixIn:
"""Mixin to provide options to an OptionsManager"""
Expand Down
262 changes: 102 additions & 160 deletions pylint/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,70 +243,8 @@ def _cpu_count() -> int:
return 1


if multiprocessing is not None:

class ChildLinter(multiprocessing.Process):
def run(self):
# pylint: disable=no-member, unbalanced-tuple-unpacking
tasks_queue, results_queue, self._config = self._args

self._config["jobs"] = 1 # Child does not parallelize any further.
self._python3_porting_mode = self._config.pop("python3_porting_mode", None)
self._plugins = self._config.pop("plugins", None)

# Run linter for received files/modules.
for file_or_module in iter(tasks_queue.get, "STOP"):
try:
result = self._run_linter(file_or_module[0])
results_queue.put(result)
except Exception as ex:
print(
"internal error with sending report for module %s"
% file_or_module,
file=sys.stderr,
)
print(ex, file=sys.stderr)
results_queue.put({})

def _run_linter(self, file_or_module):
linter = PyLinter()

# Register standard checkers.
linter.load_default_plugins()
# Load command line plugins.
if self._plugins:
linter.load_plugin_modules(self._plugins)

linter.load_configuration_from_config(self._config)

# Load plugin specific configuration
linter.load_plugin_configuration()

linter.set_reporter(reporters.CollectingReporter())

# Enable the Python 3 checker mode. This option is
# passed down from the parent linter up to here, since
# the Python 3 porting flag belongs to the Run class,
# instead of the Linter class.
if self._python3_porting_mode:
linter.python3_porting_mode()

# Run the checks.
linter.check(file_or_module)

msgs = [_get_new_args(m) for m in linter.reporter.messages]
return (
file_or_module,
linter.file_state.base_name,
linter.current_name,
msgs,
linter.stats,
linter.msg_status,
)


# pylint: disable=too-many-instance-attributes
class PyLinter(
class PyLinter( # pylint: disable=too-many-public-methods
config.OptionsManagerMixIn,
MessagesHandlerMixIn,
reporters.ReportsHandlerMixIn,
Expand All @@ -323,6 +261,9 @@ class PyLinter(
IDE plugin developers: you may have to call
`astroid.builder.MANAGER.astroid_cache.clear()` across runs if you want
to ensure the latest code version is actually checked.
This class needs to support pickling for parallel linting to work. The exception
is reporter member; see check_parallel function for more details.
"""

__implements__ = (interfaces.ITokenChecker,)
Expand Down Expand Up @@ -971,16 +912,20 @@ def should_analyze_file(modname, path, is_argument=False):

# pylint: enable=unused-argument

def check(self, files_or_modules):
"""main checking entry: check a list of files or modules from their
name.
"""
def initialize(self):
# initialize msgs_state now that all messages have been registered into
# the store
for msg in self.msgs_store.messages:
if not msg.may_be_emitted():
self._msgs_state[msg.msgid] = False

def check(self, files_or_modules):
"""main checking entry: check a list of files or modules from their
name.
"""

self.initialize()

if not isinstance(files_or_modules, (list, tuple)):
files_or_modules = (files_or_modules,)

Expand All @@ -998,100 +943,21 @@ def check(self, files_or_modules):
elif self.config.jobs == 1:
self._check_files(self.get_ast, self._iterate_file_descrs(files_or_modules))
else:
self._parallel_check(files_or_modules)

def _get_jobs_config(self):
child_config = collections.OrderedDict()
filter_options = {"long-help"}
filter_options.update((opt_name for opt_name, _ in self._external_opts))
for opt_providers in self._all_options.values():
for optname, optdict, val in opt_providers.options_and_values():
if optdict.get("deprecated"):
continue

if optname not in filter_options:
child_config[optname] = utils._format_option_value(optdict, val)
child_config["python3_porting_mode"] = self._python3_porting_mode
child_config["plugins"] = self._dynamic_plugins
return child_config

def _parallel_task(self, files_or_modules):
# Prepare configuration for child linters.
child_config = self._get_jobs_config()

children = []
manager = multiprocessing.Manager()
tasks_queue = manager.Queue()
results_queue = manager.Queue()

# Send files to child linters.
expanded_files = []
for descr in self._expand_files(files_or_modules):
modname, filepath, is_arg = descr["name"], descr["path"], descr["isarg"]
if self.should_analyze_file(modname, filepath, is_argument=is_arg):
expanded_files.append(descr)

# do not start more jobs than needed
for _ in range(min(self.config.jobs, len(expanded_files))):
child_linter = ChildLinter(args=(tasks_queue, results_queue, child_config))
child_linter.start()
children.append(child_linter)

for files_or_module in expanded_files:
path = files_or_module["path"]
tasks_queue.put([path])

# collect results from child linters
failed = False
for _ in expanded_files:
try:
result = results_queue.get()
except Exception as ex:
print(
"internal error while receiving results from child linter",
file=sys.stderr,
)
print(ex, file=sys.stderr)
failed = True
break
yield result

# Stop child linters and wait for their completion.
for _ in range(self.config.jobs):
tasks_queue.put("STOP")
for child in children:
child.join()

if failed:
print("Error occurred, stopping the linter.", file=sys.stderr)
sys.exit(32)

def _parallel_check(self, files_or_modules):
# Reset stats.
self.open()

all_stats = []
module = None
for result in self._parallel_task(files_or_modules):
if not result:
continue
(_, self.file_state.base_name, module, messages, stats, msg_status) = result

for msg in messages:
msg = Message(*msg)
self.set_current_module(module)
self.reporter.handle_message(msg)
check_parallel(
self, self.config.jobs, self._iterate_file_descrs(files_or_modules)
)

all_stats.append(stats)
self.msg_status |= msg_status
def check_single_file(self, name, filepath, modname):
"""Check single file
self.stats = _merge_stats(all_stats)
self.current_name = module
The arguments are the same that are documented in _check_files
# Insert stats data to local checkers.
for checker in self.get_checkers():
if checker is not self:
checker.stats = self.stats
The initialize() method should be called before calling this method
"""
with self._astroid_module_checker() as check_astroid_module:
self._check_file(
self.get_ast, check_astroid_module, name, filepath, modname
)

def _check_files(self, get_ast, file_descrs):
"""Check all files from file_descrs
Expand Down Expand Up @@ -1313,6 +1179,78 @@ def _report_evaluation(self):
self.reporter.display_reports(sect)


def check_parallel(linter, jobs, files):
"""Use the given linter to lint the files with given amount of workers (jobs)
"""
# The reporter does not need to be passed to worker processess, i.e. the reporter does
# not need to be pickleable
original_reporter = linter.reporter
linter.reporter = None

# The linter is inherited by all the pool's workers, i.e. the linter
# is identical to the linter object here. This is requirde so that
# a custom PyLinter object (inherited from PyLinter) can be used.
# See https://github.com/PyCQA/prospector/issues/320
with multiprocessing.Pool(
jobs, initializer=_worker_initialize, initargs=[linter]
) as pool:
# ..and now when the workers have inherited the linter, the actual reporter
# can be set back here on the parent process so that results get stored into
# correct reporter
linter.set_reporter(original_reporter)
linter.open()

all_stats = []

for module, messages, stats, msg_status in pool.imap_unordered(
_worker_check_single_file, files
):
linter.set_current_module(module)
for msg in messages:
msg = Message(*msg)
linter.reporter.handle_message(msg)

all_stats.append(stats)
linter.msg_status |= msg_status

linter.stats = _merge_stats(all_stats)

# Insert stats data to local checkers.
for checker in linter.get_checkers():
if checker is not linter:
checker.stats = linter.stats


# PyLinter object used by worker processes when checking files using multiprocessing
# should only be used by the worker processes
_worker_linter = None


def _worker_initialize(linter):
global _worker_linter # pylint: disable=global-statement
_worker_linter = linter

# On the worker process side the messages are just collected and passed back to
# parent process as _worker_check_file function's return value
_worker_linter.set_reporter(reporters.CollectingReporter())
_worker_linter.open()


def _worker_check_single_file(file_item):
name, filepath, modname = file_item

_worker_linter.open()
_worker_linter.check_single_file(name, filepath, modname)

msgs = [_get_new_args(m) for m in _worker_linter.reporter.messages]
return (
_worker_linter.current_name,
msgs,
_worker_linter.stats,
_worker_linter.msg_status,
)


# some reporting functions ####################################################


Expand Down Expand Up @@ -1466,6 +1404,10 @@ class Run:
),
)

@staticmethod
def _return_one(*args): # pylint: disable=unused-argument
return 1

def __init__(self, args, reporter=None, do_exit=True):
self._rcfile = None
self._plugins = []
Expand All @@ -1491,7 +1433,7 @@ def __init__(self, args, reporter=None, do_exit=True):
"rcfile",
{
"action": "callback",
"callback": lambda *args: 1,
"callback": Run._return_one,
"type": "string",
"metavar": "<file>",
"help": "Specify a configuration file.",
Expand All @@ -1501,7 +1443,7 @@ def __init__(self, args, reporter=None, do_exit=True):
"init-hook",
{
"action": "callback",
"callback": lambda *args: 1,
"callback": Run._return_one,
"type": "string",
"metavar": "<code>",
"level": 1,
Expand Down
Loading

0 comments on commit 141888f

Please sign in to comment.