Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
gainxglobal committed Mar 5, 2020
0 parents commit 6d92d6e
Show file tree
Hide file tree
Showing 10 changed files with 426 additions and 0 deletions.
129 changes: 129 additions & 0 deletions .gitignore
@@ -0,0 +1,129 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/
21 changes: 21 additions & 0 deletions LICENSE
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2020 Malcolm van Raalte

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
89 changes: 89 additions & 0 deletions README.md
@@ -0,0 +1,89 @@
# multiprocess_chunks

Chunk-based, multiprocess processing of iterables.
Uses the `multiprocess` package to perform the multiprocessization.
Uses the `cloudpickle` to pickle hard-to-pickle objects.

#### Why is this useful?

When using the built-in Python `multiprocessing.Pool.map` method the items being iterated are individually pickled. This can lead to a lot of pickling which can negatively affect performance. This is particularly true, and not necessarily obvious, if extra data is passed into the `f` function via a lambda. For example:

```
from multiprocessing import Pool
d = {...} # a large dict of some sort
p.map(lamda x: x + d[x], [1, 2, 3, ...])
```

In this case both `x` and `d` are pickled, individually, for every item in `[1, 2, 3, ...]`.

The methods in this package divide the `[1, 2, 3, ...]` list into chunks and pickle each chunk and `d` a small number of times.

## Installation

```
pip install multiprocess-chunks
```

## Usage

There are two methods to choose from: `map_list_as_chunks` and `map_list_in_chunks`.

#### map_list_as_chunks

This method divides the iterable that is passed to it into chunks.
The chunks are then processed in multiprocess.
It returns the mapped chunks.

Parameters:
`def map_list_as_chunks(l, f, extra_data, cpus=None, max_chunk_size=None`)

- `l`: The iterable to process in multiprocess.
- `f`: The function that processes each chunk. It takes two parameters: - `chunk, extra_data`
- `extra_data`: Data that is passed into `f` for each chunk.
- `cpus`: The number of CPUs to use. If `None` the number of cores on the system will be used. This value decides how many chunks to create.
- `max_chunk_size`: Limits the chunk size.

Example:

```
from multiprocess_chunks import map_list_as_chunks
l = range(0, 10)
f = lambda chunk, ed: [c * ed for c in chunk]
result = map_list_as_chunks(l, f, 5, 2)
# result = [ [0, 5, 10, 15, 20], [25, 30, 35, 40, 45] ]
```

#### map_list_in_chunks

This method divides the iterable that is passed to it into chunks.
The chunks are then processed in multiprocess.
It unwinds the processed chunks to return the processed items.

Parameters:
`def map_list_in_chunks(l, f, extra_data)`

- `l`: The iterable to process in multiprocess.
- `f`: The function that processes each chunk. It takes two parameters: `item, extra_data`
- `extra_data`: Data that is passed into `f` for each chunk.

Example:

```
from multiprocess_chunks import map_list_in_chunks
l = range(0, 10)
f = lambda item, ed: item * ed
result = map_list_in_chunks(l, f, 5)
# result = [0, 5, 10, 15, 20 25, 30, 35, 40, 45]
```

Essentially, `map_list_in_chunks` gives the same output as `multiprocessing.Pool.map` but, behind the scenes, it is chunking and being efficient about pickling.

#### A note on pickling

This package uses the `pathos` package to perform the multiprocessization and the `cloudpickle` package to perform pickling. This allows it to pickle objects that Python's built-in `multiprocessing` cannot.

#### Performance

How much better will your code perform? There are many factors at play here. The only way to know is to do your own timings.
2 changes: 2 additions & 0 deletions multiprocess_chunks/__init__.py
@@ -0,0 +1,2 @@
from multiprocess_chunks.methods import map_list_as_chunks
from multiprocess_chunks.methods import map_list_in_chunks
87 changes: 87 additions & 0 deletions multiprocess_chunks/methods.py
@@ -0,0 +1,87 @@
'''
Chunk-based, multiprocess processing of iterables.
Uses the `multiprocess` package to perform the multiprocessization.
Uses the `cloudpickle` to pickle hard-to-pickle objects.
'''

from math import ceil
import cloudpickle
from pathos.multiprocessing import cpu_count
from pathos.multiprocessing import ProcessPool as Pool


def map_list_as_chunks(l, f, extra_data, cpus=None, max_chunk_size=None):
'''
A wrapper around `pathos.multiprocessing.ProcessPool.uimap` that processes a list in chunks.
Differs from `map_list_in_chunks` in that this method calls `f` once for each chunk.
uimap already chunks but if you have extra data to pass in it will pickle
it for every item. This function passes in the extra data to each chunk
which significantly saves on pickling.
https://stackoverflow.com/questions/53604048/iterating-the-results-of-a-multiprocessing-list-is-consuming-large-amounts-of-me
Parameters
----------
l : list
the list
f : function
the function to process each item
takes two parameters: chunk, extra_data
extra_data : object
the extra data to pass to each f
cpus : int
the number of cores to use to split the chunks across
max_chunk_size : int
the maximum size for each chunk
'''
cpus = cpu_count() if cpus is None else cpus
max_chunk_size = float('inf') if max_chunk_size is None else max_chunk_size
chunk_length = min(max_chunk_size, max(1, ceil(len(l) / cpus)))
chunks = [l[x:x+chunk_length] for x in range(0, len(l), chunk_length)]
pool = Pool(nodes=cpus)
f_dumps = cloudpickle.dumps(f)
tuples = [(chunk, f_dumps, extra_data) for chunk in chunks]
return pool.map(_process_whole_chunk, tuples)


def map_list_in_chunks(l, f, extra_data):
'''
A wrapper around ProcessPool.uimap that processes a list in chunks.
Differs from `map_list_as_chunks` in that this method calls `f` once for each item in `l`.
uimap already chunks but if you have extra data to pass in it will pickle
it for every item. This function passes in the extra data to each chunk
which significantly saves on pickling.
https://stackoverflow.com/questions/53604048/iterating-the-results-of-a-multiprocessing-list-is-consuming-large-amounts-of-me
Parameters
----------
l : list
the list
f : function
the function to process each item
takes two parameters: item, extra_data
extra_data : object
the extra data to pass to each f
'''
cpus = cpu_count()
chunk_length = max(1, int(len(l) / cpus))
chunks = [l[x:x+chunk_length] for x in range(0, len(l), chunk_length)]
pool = Pool(nodes=cpus)
f_dumps = cloudpickle.dumps(f)
tuples = [(chunk, f_dumps, extra_data) for chunk in chunks]
mapped_chunks = pool.map(_process_chunk, tuples)
return (item for chunk in mapped_chunks for item in chunk)


def _process_chunk(tup):
'''Processes a chunk by invoking `f` for each item in the chunk.'''
chunk, f_dumps, extra_data = tup
f_loads = cloudpickle.loads(f_dumps)
return [f_loads(i, extra_data) for i in chunk]


def _process_whole_chunk(tup):
chunk, f_dumps, extra_data = tup
f_loads = cloudpickle.loads(f_dumps)
return f_loads(chunk, extra_data)
2 changes: 2 additions & 0 deletions requirements.txt
@@ -0,0 +1,2 @@
cloudpickle
pathos
1 change: 1 addition & 0 deletions run_tests.sh
@@ -0,0 +1 @@
py -m unittest test.py
4 changes: 4 additions & 0 deletions setup.cfg
@@ -0,0 +1,4 @@
# Inside of setup.cfg
[metadata]
description-file = README.md
long-description-content-type = text/markdown
35 changes: 35 additions & 0 deletions setup.py
@@ -0,0 +1,35 @@
from setuptools import setup

with open("README.md", "r") as fh:
long_description = fh.read()

setup(
name = 'multiprocess_chunks', # How you named your package folder (MyLib)
packages = ['multiprocess_chunks'], # Chose the same as "name"
version = '1.0.0', # Start with a small number and increase it with every change you make
license='MIT', # Chose a license from here: https://help.github.com/articles/licensing-a-repository
description = 'Chunk-based, multiprocess processing of iterables.', # Give a short description about your library
long_description=long_description,
long_description_content_type='text/markdown',
author = 'Malcolm van Raalte', # Type in your name
author_email = 'malcolm@van.raalte.ca', # Type in your E-Mail
url = 'https://github.com/malcolmvr/multiprocess-chunks', # Provide either the link to your github or to your website
download_url = 'https://github.com/malcolmvr/multiprocess-chunks/archive/v1.0.0.tar.gz', # I explain this later on
keywords = ['multiprocess', 'multiprocessing', 'pickle', 'pickling', 'chunks', 'map'], # Keywords that define your package best
install_requires=[ # I get to this in a second
'cloudpickle',
'pathos',
],
python_requires='>=3.3',
classifiers=[
'Development Status :: 5 - Production/Stable', # Chose either "3 - Alpha", "4 - Beta" or "5 - Production/Stable" as the current state of your package 'Intended Audience :: Developers', # Define that your audience are developers
'Topic :: Software Development :: Build Tools',
'License :: OSI Approved :: MIT License', # Again, pick a license 'Programming Language :: Python :: 3', #Specify which pyhton versions that you want to support
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
],
)

0 comments on commit 6d92d6e

Please sign in to comment.