diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b6e4761 --- /dev/null +++ b/.gitignore @@ -0,0 +1,129 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..4c0e4fd --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Malcolm van Raalte + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..6b1f1af --- /dev/null +++ b/README.md @@ -0,0 +1,89 @@ +# multiprocess_chunks + +Chunk-based, multiprocess processing of iterables. +Uses the `multiprocess` package to perform the multiprocessization. +Uses the `cloudpickle` to pickle hard-to-pickle objects. + +#### Why is this useful? + +When using the built-in Python `multiprocessing.Pool.map` method the items being iterated are individually pickled. This can lead to a lot of pickling which can negatively affect performance. This is particularly true, and not necessarily obvious, if extra data is passed into the `f` function via a lambda. For example: + +``` +from multiprocessing import Pool +d = {...} # a large dict of some sort +p.map(lamda x: x + d[x], [1, 2, 3, ...]) +``` + +In this case both `x` and `d` are pickled, individually, for every item in `[1, 2, 3, ...]`. + +The methods in this package divide the `[1, 2, 3, ...]` list into chunks and pickle each chunk and `d` a small number of times. + +## Installation + +``` +pip install multiprocess-chunks +``` + +## Usage + +There are two methods to choose from: `map_list_as_chunks` and `map_list_in_chunks`. + +#### map_list_as_chunks + +This method divides the iterable that is passed to it into chunks. +The chunks are then processed in multiprocess. +It returns the mapped chunks. + +Parameters: +`def map_list_as_chunks(l, f, extra_data, cpus=None, max_chunk_size=None`) + +- `l`: The iterable to process in multiprocess. +- `f`: The function that processes each chunk. It takes two parameters: - `chunk, extra_data` +- `extra_data`: Data that is passed into `f` for each chunk. +- `cpus`: The number of CPUs to use. If `None` the number of cores on the system will be used. This value decides how many chunks to create. +- `max_chunk_size`: Limits the chunk size. + +Example: + +``` +from multiprocess_chunks import map_list_as_chunks + +l = range(0, 10) +f = lambda chunk, ed: [c * ed for c in chunk] +result = map_list_as_chunks(l, f, 5, 2) +# result = [ [0, 5, 10, 15, 20], [25, 30, 35, 40, 45] ] +``` + +#### map_list_in_chunks + +This method divides the iterable that is passed to it into chunks. +The chunks are then processed in multiprocess. +It unwinds the processed chunks to return the processed items. + +Parameters: +`def map_list_in_chunks(l, f, extra_data)` + +- `l`: The iterable to process in multiprocess. +- `f`: The function that processes each chunk. It takes two parameters: `item, extra_data` +- `extra_data`: Data that is passed into `f` for each chunk. + +Example: + +``` +from multiprocess_chunks import map_list_in_chunks + +l = range(0, 10) +f = lambda item, ed: item * ed +result = map_list_in_chunks(l, f, 5) +# result = [0, 5, 10, 15, 20 25, 30, 35, 40, 45] +``` + +Essentially, `map_list_in_chunks` gives the same output as `multiprocessing.Pool.map` but, behind the scenes, it is chunking and being efficient about pickling. + +#### A note on pickling + +This package uses the `pathos` package to perform the multiprocessization and the `cloudpickle` package to perform pickling. This allows it to pickle objects that Python's built-in `multiprocessing` cannot. + +#### Performance + +How much better will your code perform? There are many factors at play here. The only way to know is to do your own timings. diff --git a/multiprocess_chunks/__init__.py b/multiprocess_chunks/__init__.py new file mode 100644 index 0000000..0b2fb2d --- /dev/null +++ b/multiprocess_chunks/__init__.py @@ -0,0 +1,2 @@ +from multiprocess_chunks.methods import map_list_as_chunks +from multiprocess_chunks.methods import map_list_in_chunks \ No newline at end of file diff --git a/multiprocess_chunks/methods.py b/multiprocess_chunks/methods.py new file mode 100644 index 0000000..234eff6 --- /dev/null +++ b/multiprocess_chunks/methods.py @@ -0,0 +1,87 @@ +''' +Chunk-based, multiprocess processing of iterables. +Uses the `multiprocess` package to perform the multiprocessization. +Uses the `cloudpickle` to pickle hard-to-pickle objects. +''' + +from math import ceil +import cloudpickle +from pathos.multiprocessing import cpu_count +from pathos.multiprocessing import ProcessPool as Pool + + +def map_list_as_chunks(l, f, extra_data, cpus=None, max_chunk_size=None): + ''' + A wrapper around `pathos.multiprocessing.ProcessPool.uimap` that processes a list in chunks. + Differs from `map_list_in_chunks` in that this method calls `f` once for each chunk. + + uimap already chunks but if you have extra data to pass in it will pickle + it for every item. This function passes in the extra data to each chunk + which significantly saves on pickling. + https://stackoverflow.com/questions/53604048/iterating-the-results-of-a-multiprocessing-list-is-consuming-large-amounts-of-me + + Parameters + ---------- + l : list + the list + f : function + the function to process each item + takes two parameters: chunk, extra_data + extra_data : object + the extra data to pass to each f + cpus : int + the number of cores to use to split the chunks across + max_chunk_size : int + the maximum size for each chunk + ''' + cpus = cpu_count() if cpus is None else cpus + max_chunk_size = float('inf') if max_chunk_size is None else max_chunk_size + chunk_length = min(max_chunk_size, max(1, ceil(len(l) / cpus))) + chunks = [l[x:x+chunk_length] for x in range(0, len(l), chunk_length)] + pool = Pool(nodes=cpus) + f_dumps = cloudpickle.dumps(f) + tuples = [(chunk, f_dumps, extra_data) for chunk in chunks] + return pool.map(_process_whole_chunk, tuples) + + +def map_list_in_chunks(l, f, extra_data): + ''' + A wrapper around ProcessPool.uimap that processes a list in chunks. + Differs from `map_list_as_chunks` in that this method calls `f` once for each item in `l`. + + uimap already chunks but if you have extra data to pass in it will pickle + it for every item. This function passes in the extra data to each chunk + which significantly saves on pickling. + https://stackoverflow.com/questions/53604048/iterating-the-results-of-a-multiprocessing-list-is-consuming-large-amounts-of-me + + Parameters + ---------- + l : list + the list + f : function + the function to process each item + takes two parameters: item, extra_data + extra_data : object + the extra data to pass to each f + ''' + cpus = cpu_count() + chunk_length = max(1, int(len(l) / cpus)) + chunks = [l[x:x+chunk_length] for x in range(0, len(l), chunk_length)] + pool = Pool(nodes=cpus) + f_dumps = cloudpickle.dumps(f) + tuples = [(chunk, f_dumps, extra_data) for chunk in chunks] + mapped_chunks = pool.map(_process_chunk, tuples) + return (item for chunk in mapped_chunks for item in chunk) + + +def _process_chunk(tup): + '''Processes a chunk by invoking `f` for each item in the chunk.''' + chunk, f_dumps, extra_data = tup + f_loads = cloudpickle.loads(f_dumps) + return [f_loads(i, extra_data) for i in chunk] + + +def _process_whole_chunk(tup): + chunk, f_dumps, extra_data = tup + f_loads = cloudpickle.loads(f_dumps) + return f_loads(chunk, extra_data) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..68cf949 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +cloudpickle +pathos \ No newline at end of file diff --git a/run_tests.sh b/run_tests.sh new file mode 100644 index 0000000..c5ed257 --- /dev/null +++ b/run_tests.sh @@ -0,0 +1 @@ +py -m unittest test.py \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..8502e53 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,4 @@ +# Inside of setup.cfg +[metadata] +description-file = README.md +long-description-content-type = text/markdown \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..dc40182 --- /dev/null +++ b/setup.py @@ -0,0 +1,35 @@ +from setuptools import setup + +with open("README.md", "r") as fh: + long_description = fh.read() + +setup( + name = 'multiprocess_chunks', # How you named your package folder (MyLib) + packages = ['multiprocess_chunks'], # Chose the same as "name" + version = '1.0.0', # Start with a small number and increase it with every change you make + license='MIT', # Chose a license from here: https://help.github.com/articles/licensing-a-repository + description = 'Chunk-based, multiprocess processing of iterables.', # Give a short description about your library + long_description=long_description, + long_description_content_type='text/markdown', + author = 'Malcolm van Raalte', # Type in your name + author_email = 'malcolm@van.raalte.ca', # Type in your E-Mail + url = 'https://github.com/malcolmvr/multiprocess-chunks', # Provide either the link to your github or to your website + download_url = 'https://github.com/malcolmvr/multiprocess-chunks/archive/v1.0.0.tar.gz', # I explain this later on + keywords = ['multiprocess', 'multiprocessing', 'pickle', 'pickling', 'chunks', 'map'], # Keywords that define your package best + install_requires=[ # I get to this in a second + 'cloudpickle', + 'pathos', + ], + python_requires='>=3.3', + classifiers=[ + 'Development Status :: 5 - Production/Stable', # Chose either "3 - Alpha", "4 - Beta" or "5 - Production/Stable" as the current state of your package 'Intended Audience :: Developers', # Define that your audience are developers + 'Topic :: Software Development :: Build Tools', + 'License :: OSI Approved :: MIT License', # Again, pick a license 'Programming Language :: Python :: 3', #Specify which pyhton versions that you want to support + 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + ], +) \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..f95cb50 --- /dev/null +++ b/test.py @@ -0,0 +1,56 @@ +import unittest +from multiprocess_chunks import map_list_as_chunks, map_list_in_chunks + + +class TestCase(unittest.TestCase): + + + def test_map_list_as_chunks_1(self): + l = range(0, 10) + f = lambda chunk, ed: [c * ed for c in chunk] + result = map_list_as_chunks(l, f, 5, 2) + self.assertCountEqual(result[0], [0, 5, 10, 15, 20]) + self.assertCountEqual(result[1], [25, 30, 35, 40, 45]) + + + def test_map_list_as_chunks_2(self): + l = range(0, 9) + f = lambda chunk, ed: [c * ed for c in chunk] + result = map_list_as_chunks(l, f, 5, 2) + self.assertCountEqual(result[0], [0, 5, 10, 15, 20]) + self.assertCountEqual(result[1], [25, 30, 35, 40]) + + + def test_map_list_as_chunks_3(self): + l = [] + f = lambda chunk, ed: [c * ed for c in chunk] + result = map_list_as_chunks(l, f, 5, 2) + self.assertEqual(result, []) + + + def test_map_list_as_chunks_4(self): + l = [1] + f = lambda chunk, ed: [c * ed for c in chunk] + result = map_list_as_chunks(l, f, 5, 2) + self.assertEqual(result, [[5]]) + + + def test_map_list_in_chunks_1(self): + l = range(0, 10) + f = lambda item, ed: item * ed + result = map_list_in_chunks(l, f, 5) + self.assertCountEqual(result, [0, 5, 10, 15, 20, 25, 30, 35, 40, 45]) + + + def test_map_list_in_chunks_2(self): + l = [] + f = lambda item, ed: item * ed + result = list(map_list_in_chunks(l, f, 5)) + self.assertEqual(result, []) + + + def test_map_list_in_chunks_3(self): + l = [1] + f = lambda item, ed: item * ed + result = list(map_list_in_chunks(l, f, 5)) + self.assertEqual(result, [5])