Skip to content

Commit

Permalink
Dependencies and hanging mpire (#33)
Browse files Browse the repository at this point in the history
* Platform specific dependencies are now handled using environment markers as defined in PEP-508 (fixes #30)
* Fixes hanging WorkerPool when using worker_lifespan and returning results that exceed the pipe capacity (fixes #32)
* Add sleep in insight tests so execution time is greater than zero

Co-authored-by: sybrenjansen <sybren.jansen@gmail.com>
Co-authored-by: Michiel van de Steeg <m.vd.steeg90@gmail.com>
  • Loading branch information
3 people committed Mar 29, 2022
1 parent 4541c4f commit 91e35d5
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 54 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ build
_build
dist
*.egg-info
.eggs

12 changes: 12 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
Changelog
=========

Dev
---

* Platform specific dependencies are now handled using environment markers as defined in PEP-508_ (`#30`_)
* Fixes hanging ``WorkerPool`` when using ``worker_lifespan`` and returning results that exceed the pipe capacity
(`#32`_)
* Fixes insights unit tests that could sometime fail because it was too fast

.. _PEP-508: https://www.python.org/dev/peps/pep-0508/#environment-markers
.. _#30: https://github.com/Slimmer-AI/mpire/issues/30
.. _#32: https://github.com/Slimmer-AI/mpire/issues/32

2.3.3
-----

Expand Down
32 changes: 26 additions & 6 deletions mpire/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,38 @@ def _start_workers(self, progress_bar: bool) -> None:
self._workers.append(self._start_worker(worker_id))
logger.debug("Workers created")

def _restart_workers(self) -> None:
def _restart_workers(self) -> List[Any]:
"""
Restarts workers that need to be restarted.
:return: List of unordered results produces by workers
"""
obtained_results = []
for worker_id in self._worker_comms.get_worker_restarts():
# Obtain results from exit results queue (should be done before joining the worker)
if self.map_params.worker_exit:
self._exit_results.append(self._worker_comms.get_exit_results(worker_id))

# Join worker
self._worker_comms.reset_worker_restart(worker_id)
self._workers[worker_id].join()
self._workers[worker_id].join(timeout=0.01)

# If we time-out, it means the worker still has data that needs to be send over. Note that on Windows, this
# is not necessarily the case. Windows is just a bit slow and thinks it cannot join yet, while in fact
# nothing is stoping it from joining. So, we just try it a few times ...
while self._workers[worker_id].exitcode is None:
try:
obtained_results.append(self._worker_comms.get_results(block=True, timeout=0.01))
except queue.Empty:
pass

self._workers[worker_id].join(timeout=0.01)

# Start new worker
self._workers[worker_id] = self._start_worker(worker_id)

return obtained_results

def _start_worker(self, worker_id: int) -> mp.Process:
"""
Creates and starts a single worker
Expand Down Expand Up @@ -531,8 +547,10 @@ def imap_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable
except queue.Empty:
pass

# Restart workers if necessary
self._restart_workers()
# Restart workers if necessary. This can yield intermediate results
for results in self._restart_workers():
yield from results
n_active -= 1

# Obtain the results not yet obtained
while not self._worker_comms.exception_thrown() and n_active != 0:
Expand All @@ -542,8 +560,10 @@ def imap_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable
except queue.Empty:
pass

# Restart workers if necessary
self._restart_workers()
# Restart workers if necessary. This can yield intermediate results
for results in self._restart_workers():
yield from results
n_active -= 1

# Terminate if exception has been thrown at this point
if self._worker_comms.exception_thrown():
Expand Down
75 changes: 36 additions & 39 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,59 @@
import platform
import sys
from setuptools import find_packages, setup

# For Python < 3.7 we need dataclasses. On Windows, we need pywin32 for CPU pinning
additional_dependencies = []
if sys.version_info[0] == 3 and sys.version_info[1] < 7:
additional_dependencies.append("dataclasses")
if platform.system() == "Windows":
additional_dependencies.append("pywin32==225")


def read_description():
with open("README.rst") as file:
with open('README.rst') as file:
return file.read()


if __name__ == "__main__":
if __name__ == '__main__':
# For Python < 3.7 we need dataclasses. On Windows, we need pywin32 for CPU pinning
setup(
name="mpire",
version="2.3.3",
author="Slimmer AI",
description="A Python package for easy multiprocessing, but faster than multiprocessing",
name='mpire',
version='2.3.3',
author='Slimmer AI',
description='A Python package for easy multiprocessing, but faster than multiprocessing',
long_description=read_description(),
url="https://github.com/Slimmer-AI/mpire",
license="MIT",
url='https://github.com/Slimmer-AI/mpire',
license='MIT',
packages=find_packages(),
scripts=["bin/mpire-dashboard"],
install_requires=["tqdm"] + additional_dependencies,
scripts=['bin/mpire-dashboard'],
install_requires=['dataclasses; python_version<"3.7"',
'pywin32==225; platform_system=="Windows"',
'tqdm'],
include_package_data=True,
extras_require={
"dashboard": ["flask"],
"dill": ["multiprocess"],
"docs": ["docutils==0.17.1",
"sphinx==3.2.1",
"sphinx-rtd-theme==0.5.0",
"sphinx-autodoc-typehints==1.11.0",
"sphinxcontrib-images==0.9.2",
"sphinx-versions==1.0.1"],
"testing": ["multiprocess", "numpy"] + additional_dependencies
'dashboard': ['flask'],
'dill': ['multiprocess'],
'docs': ['docutils==0.17.1',
'sphinx==3.2.1',
'sphinx-rtd-theme==0.5.0',
'sphinx-autodoc-typehints==1.11.0',
'sphinxcontrib-images==0.9.2',
'sphinx-versions==1.0.1'],
'testing': ['dataclasses; python_version<"3.7"',
'multiprocess',
'numpy',
'pywin32==225; platform_system=="Windows"']
},
test_suite="tests",
tests_require=["multiprocess", "numpy"],
test_suite='tests',
tests_require=['multiprocess', 'numpy'],
classifiers=[
# Development status
"Development Status :: 5 - Production/Stable",
'Development Status :: 5 - Production/Stable',

# Supported Python versions
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',

# License
"License :: OSI Approved :: MIT License",
'License :: OSI Approved :: MIT License',

# Topic
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules"
'Topic :: Software Development',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)
22 changes: 15 additions & 7 deletions tests/test_insights.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import unittest
from datetime import datetime
from multiprocessing import managers
from time import sleep
from unittest.mock import patch

from mpire import WorkerPool
Expand All @@ -13,8 +14,7 @@

def square(x):
# time.sleep is added for Windows compatibility, otherwise it says 0.0 time has passed
import time
time.sleep(0.1)
sleep(0.001)
return x * x


Expand Down Expand Up @@ -126,7 +126,7 @@ def test_enable_insights(self):
for idx in range(3):
with self.subTest('enabled', idx=idx):

pool.map(square, range(10), worker_init=self._init, worker_exit=self._exit)
pool.map(square, self._get_tasks(10), worker_init=self._init, worker_exit=self._exit)

# Basic sanity checks for the values. Some max task args can be empty, in that case the duration
# should be 0 (= no data)
Expand Down Expand Up @@ -375,12 +375,20 @@ def test_get_insights(self):
'exit_time_mean': '0:00:00.385', 'exit_time_std': '0:00:00.055'
})

@staticmethod
def _get_tasks(n):
""" Simulate that getting tasks takes some time """
for i in range(n):
# sleep is added for Windows compatibility, otherwise it says 0.0 time has passed
sleep(0.001)
yield i

@staticmethod
def _init():
# It's just here so we have something to time
_ = [x ** x for x in range(1000)]
# sleep is added for Windows compatibility, otherwise it says 0.0 time has passed
sleep(0.001)

@staticmethod
def _exit():
# It's just here so we have something to time
return [x ** x for x in range(1000)]
# sleep is added for Windows compatibility, otherwise it says 0.0 time has passed
sleep(0.001)
19 changes: 17 additions & 2 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
import types
import unittest
import warnings
Expand All @@ -24,6 +25,10 @@ def square(idx, x):
return idx, x * x


def extremely_large_output(idx, x):
return idx, os.urandom(1024 * 1024)


def square_numpy(x):
return x * x

Expand Down Expand Up @@ -55,7 +60,6 @@ def get_generator(iterable):
yield from iterable

# Test results for different parameter settings
print()
for n_jobs, n_tasks_max_active, worker_lifespan, chunk_size, n_splits in tqdm([
(None, None, None, None, None),
(1, None, None, None, None),
Expand All @@ -78,7 +82,8 @@ def get_generator(iterable):
chunk_size=chunk_size, n_splits=n_splits):

# Test if parallel map results in the same as ordinary map function. Should work both for
# generators and iterators. Also check if an empty list works as desired.
# generators and iterators. Also check if an empty list and extremely large output (exceeding
# os.pipe limits) works as desired.
results_list = map_func(square, self.test_data, max_tasks_active=n_tasks_max_active,
worker_lifespan=worker_lifespan)
self.assertIsInstance(results_list, result_type)
Expand All @@ -104,6 +109,16 @@ def get_generator(iterable):
self.assertIsInstance(results_list, result_type)
self.assertEqual([], list(results_list))

# When the os pipe capacity is exceeded, a worker restart based on worker lifespan would hang if we
# not fetch all the results from a worker. We only verify the amount of data returned here.
with self.subTest(map_func=map_func, output='data exceeding pipe limits', n_jobs=n_jobs,
n_tasks_max_active=n_tasks_max_active, worker_lifespan=worker_lifespan,
chunk_size=chunk_size, n_splits=n_splits):
results_list = map_func(extremely_large_output, self.test_data,
max_tasks_active=n_tasks_max_active, worker_lifespan=worker_lifespan)
self.assertIsInstance(results_list, result_type)
self.assertEqual(len(self.test_desired_output), len(list(results_list)))

def test_numpy_input(self):
"""
Test map with numpy input
Expand Down

0 comments on commit 91e35d5

Please sign in to comment.