Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize greedy algorithm #155

Merged
merged 31 commits into from Oct 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
244cbef
[parallel] add first draft of WorkerPoolInterface
sdrave Mar 24, 2015
8ba3a0e
[parallel] add IPythonPool
sdrave Mar 24, 2015
5ccb975
[parallel] refactor ipython.py
sdrave Apr 14, 2015
4ba1163
[tools] add 'Counter' class
sdrave Apr 14, 2015
baf7c0b
[parallel] add new_ipcluster_pool
sdrave Apr 14, 2015
761f0a4
[parallel] improve ipcluster startup
sdrave Apr 15, 2015
0d4d619
[parallel.ipython] do not fail on import if IPython is missing
sdrave Apr 15, 2015
7c76b8b
[parallel.ipython] ensure that pool size cannot change over time
sdrave Apr 16, 2015
7a143e1
[parallel.ipython] improve _pickle_function
sdrave Apr 16, 2015
f9e8938
[parallel.ipython] fix distribute in case only one argument is given
sdrave Apr 16, 2015
be82803
[parallel] add distribute_array and distribute_list to WorkerPoolInte…
sdrave Apr 16, 2015
05ba619
[parallel] fix default impl of distribute_list
sdrave Apr 16, 2015
d94cce6
[parallel] introduce base class for remote objects
sdrave Apr 16, 2015
2c83dd0
[core.pickle] add FunctionPicklingWrapper
sdrave Apr 17, 2015
7cc6580
[parallel.ipython] use FunctionPicklingWrapper
sdrave Apr 17, 2015
316f399
[parallel] support newer IPython versions
sdrave Sep 28, 2015
7aa5e22
[tests] be more permissive when testing 'with_'
sdrave Sep 28, 2015
639d019
Merge branch 'master' into parallel_merge
sdrave Sep 28, 2015
2c08184
[travis] fix IPython errors
sdrave Sep 28, 2015
d75d9a9
[parallel] rename interface methods
sdrave Sep 29, 2015
c0463d0
[parallel] allow push, scatter_list, scatter_array to be used without…
sdrave Sep 30, 2015
9deb76c
[parallel] add DummyPool
sdrave Sep 30, 2015
8d7dbe3
[parallel] add RemoteObjectManager
sdrave Oct 1, 2015
7011f75
[algroithms] parallelize greedy algorithm
sdrave Apr 15, 2015
7697267
[demos] add parallelization support to thermalblock demo.
sdrave Apr 15, 2015
7723196
[tools] add 'no_context' dummy context manager
sdrave Apr 15, 2015
12ee092
[demos] add option to thermalblock.py for selecting IPython profile
sdrave Apr 15, 2015
615eb0f
[algorithms.greedy] reduce communication by distributing sample set
sdrave Apr 16, 2015
83151f5
[algorithms.greedy] add log output
sdrave Apr 17, 2015
7f289b1
[algorithms] simplify greedy implementation
sdrave Jun 15, 2015
5cc43a6
[greedy] update paralleization to interface change
sdrave Oct 1, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -20,6 +20,8 @@ before_install:
- pip install --upgrade numpy
- pip install --upgrade pytest
- pip install --upgrade pytest pytest-cov
- pip install --upgrade ipython
- pip install ipyparallel

# command to install dependencies
install:
Expand Down
175 changes: 106 additions & 69 deletions src/pymor/algorithms/greedy.py
Expand Up @@ -5,15 +5,19 @@
from __future__ import absolute_import, division, print_function

import time
from itertools import izip

import numpy as np

from pymor.algorithms.basisextension import gram_schmidt_basis_extension
from pymor.core.exceptions import ExtensionError
from pymor.core.logger import getLogger
from pymor.parallel.dummy import dummy_pool
from pymor.parallel.manager import RemoteObjectManager


def greedy(discretization, reductor, samples, initial_basis=None, use_estimator=True, error_norm=None,
extension_algorithm=gram_schmidt_basis_extension, target_error=None, max_extensions=None):
extension_algorithm=gram_schmidt_basis_extension, target_error=None, max_extensions=None,
pool=None):
"""Greedy basis generation algorithm.

This algorithm generates a reduced basis by iteratively adding the
Expand Down Expand Up @@ -66,6 +70,8 @@ def greedy(discretization, reductor, samples, initial_basis=None, use_estimator=
max_extensions
If not `None`, stop the algorithm after `max_extensions` extension
steps.
pool
If not `None`, the |WorkerPool| to use for parallelization.

Returns
-------
Expand All @@ -81,73 +87,104 @@ def greedy(discretization, reductor, samples, initial_basis=None, use_estimator=

logger = getLogger('pymor.algorithms.greedy.greedy')
samples = list(samples)
logger.info('Started greedy search on {} samples'.format(len(samples)))
basis = initial_basis

tic = time.time()
extensions = 0
max_errs = []
max_err_mus = []
hierarchic = False

rd, rc, reduction_data = None, None, None
while True:
logger.info('Reducing ...')
rd, rc, reduction_data = reductor(discretization, basis) if not hierarchic \
else reductor(discretization, basis, extends=(rd, rc, reduction_data))

if len(samples) == 0:
logger.info('There is nothing else to do for empty samples.')
return {'basis': basis, 'reduced_discretization': rd, 'reconstructor': rc,
'max_errs': [], 'max_err_mus': [], 'extensions': 0,
'time': time.time() - tic, 'reduction_data': reduction_data}

logger.info('Estimating errors ...')
if use_estimator:
errors = [rd.estimate(rd.solve(mu), mu) for mu in samples]
elif error_norm is not None:
errors = [error_norm(discretization.solve(mu) - rc.reconstruct(rd.solve(mu))) for mu in samples]
else:
errors = [(discretization.solve(mu) - rc.reconstruct(rd.solve(mu))).l2_norm() for mu in samples]

# most error_norms will return an array of length 1 instead of a number, so we extract the numbers
# if necessary
errors = map(lambda x: x[0] if hasattr(x, '__len__') else x, errors)

max_err, max_err_mu = max(((err, mu) for err, mu in izip(errors, samples)), key=lambda t: t[0])
max_errs.append(max_err)
max_err_mus.append(max_err_mu)
logger.info('Maximum error after {} extensions: {} (mu = {})'.format(extensions, max_err, max_err_mu))

if target_error is not None and max_err <= target_error:
logger.info('Reached maximal error on snapshots of {} <= {}'.format(max_err, target_error))
break

logger.info('Extending with snapshot for mu = {}'.format(max_err_mu))
U = discretization.solve(max_err_mu)
try:
basis, extension_data = extension_algorithm(basis, U)
except ExtensionError:
logger.info('Extension failed. Stopping now.')
break
extensions += 1
if 'hierarchic' not in extension_data:
logger.warn('Extension algorithm does not report if extension was hierarchic. Assuming it was\'nt ..')
hierarchic = False
else:
hierarchic = extension_data['hierarchic']

logger.info('')

if max_extensions is not None and extensions >= max_extensions:
logger.info('Maximum number of {} extensions reached.'.format(max_extensions))
logger.info('Reducing once more ...')
sample_count = len(samples)
logger.info('Started greedy search on {} samples'.format(sample_count))
if pool is None or pool is dummy_pool:
pool = dummy_pool
else:
logger.info('Using pool of {} workers for parallel greedy search'.format(len(pool)))

with RemoteObjectManager() as rom:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rom may be a misleading name in the context of model reduction (reduced order model), but it is probably unproblematic in this local context...

# Push everything we need during the greedy search to the workers.
# Distribute the training set evenly among the workes.
if not use_estimator:
rom.manage(pool.push(discretization))
if error_norm:
rom.manage(pool.push(error_norm))
samples = rom.manage(pool.scatter_list(samples))

basis = initial_basis

tic = time.time()
extensions = 0
max_errs = []
max_err_mus = []
hierarchic = False

rd, rc, reduction_data = None, None, None
while True:
logger.info('Reducing ...')
rd, rc, reduction_data = reductor(discretization, basis) if not hierarchic \
else reductor(discretization, basis, extends=(rd, rc, reduction_data))
break

tictoc = time.time() - tic
logger.info('Greedy search took {} seconds'.format(tictoc))
return {'basis': basis, 'reduced_discretization': rd, 'reconstructor': rc,
'max_errs': max_errs, 'max_err_mus': max_err_mus, 'extensions': extensions,
'time': tictoc, 'reduction_data': reduction_data}
if sample_count == 0:
logger.info('There is nothing else to do for empty samples.')
return {'basis': basis, 'reduced_discretization': rd, 'reconstructor': rc,
'max_errs': [], 'max_err_mus': [], 'extensions': 0,
'time': time.time() - tic, 'reduction_data': reduction_data}

logger.info('Estimating errors ...')
if use_estimator:
errors, mus = zip(*pool.apply(_estimate, rd=rd, d=None, rc=None, samples=samples, error_norm=None))
else:
# FIXME: Always communicating rc may become a bottleneck in some use cases.
# Add special treatment for GenericRBReconstructor?
errors, mus = zip(*pool.apply(_estimate, rd=rd, d=discretization, rc=rc,
samples=samples, error_norm=error_norm))
max_err_ind = np.argmax(errors)
max_err, max_err_mu = errors[max_err_ind], mus[max_err_ind]

max_errs.append(max_err)
max_err_mus.append(max_err_mu)
logger.info('Maximum error after {} extensions: {} (mu = {})'.format(extensions, max_err, max_err_mu))

if target_error is not None and max_err <= target_error:
logger.info('Reached maximal error on snapshots of {} <= {}'.format(max_err, target_error))
break

logger.info('Extending with snapshot for mu = {}'.format(max_err_mu))
U = discretization.solve(max_err_mu)
try:
basis, extension_data = extension_algorithm(basis, U)
except ExtensionError:
logger.info('Extension failed. Stopping now.')
break
extensions += 1
if 'hierarchic' not in extension_data:
logger.warn('Extension algorithm does not report if extension was hierarchic. Assuming it was\'nt ..')
hierarchic = False
else:
hierarchic = extension_data['hierarchic']

logger.info('')

if max_extensions is not None and extensions >= max_extensions:
logger.info('Maximum number of {} extensions reached.'.format(max_extensions))
logger.info('Reducing once more ...')
rd, rc, reduction_data = reductor(discretization, basis) if not hierarchic \
else reductor(discretization, basis, extends=(rd, rc, reduction_data))
break

tictoc = time.time() - tic
logger.info('Greedy search took {} seconds'.format(tictoc))
return {'basis': basis, 'reduced_discretization': rd, 'reconstructor': rc,
'max_errs': max_errs, 'max_err_mus': max_err_mus, 'extensions': extensions,
'time': tictoc, 'reduction_data': reduction_data}


def _estimate(rd=None, d=None, rc=None, samples=None, error_norm=None):
if not samples:
return -1., None

if d is None:
errors = [rd.estimate(rd.solve(mu), mu) for mu in samples]
elif error_norm is not None:
errors = [error_norm(d.solve(mu) - rc.reconstruct(rd.solve(mu))) for mu in samples]
else:
errors = [(d.solve(mu) - rc.reconstruct(rd.solve(mu))).l2_norm() for mu in samples]
# most error_norms will return an array of length 1 instead of a number, so we extract the numbers
# if necessary
errors = map(lambda x: x[0] if hasattr(x, '__len__') else x, errors)
max_err_ind = np.argmax(errors)

return errors[max_err_ind], samples[max_err_ind]
23 changes: 23 additions & 0 deletions src/pymor/core/pickle.py
Expand Up @@ -122,3 +122,26 @@ def loads_function(s):
r = FunctionType(code, globals_, name, defaults, closure)
r.func_dict = func_dict
return r


class FunctionPicklingWrapper(object):
"""Serializable function container, using :func:`dumps_function` if necessary."""

def __init__(self, f):
self.function = f

def __getstate__(self):
f = self.function
if f.__module__ != '__main__':
try:
return dumps(f)
except PicklingError:
return (dumps_function(f),)
else:
return (dumps_function(f),)

def __setstate__(self, f):
if type(f) == tuple:
self.function = loads_function(f[0])
else:
self.function = loads(f)
4 changes: 4 additions & 0 deletions src/pymor/parallel/__init__.py
@@ -0,0 +1,4 @@
# This file is part of the pyMOR project (http://www.pymor.org).
# Copyright Holders: Rene Milk, Stephan Rave, Felix Schindler
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)

41 changes: 41 additions & 0 deletions src/pymor/parallel/defaultimpl.py
@@ -0,0 +1,41 @@
# This file is part of the pyMOR project (http://www.pymor.org).
# Copyright Holders: Rene Milk, Stephan Rave, Felix Schindler
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)

from __future__ import absolute_import, division, print_function


class WorkerPoolDefaultImplementations(object):

def scatter_array(self, U, copy=True):
slice_len = len(U) // len(self) + (1 if len(U) % len(self) else 0)
if copy:
slices = []
for i in range(len(self)):
slices.append(U.copy(ind=range(i*slice_len, min((i+1)*slice_len, len(U)))))
else:
slices = [U.empty() for _ in range(len(self))]
for s in slices:
s.append(U, o_ind=range(0, min(slice_len, len(U))), remove_from_other=True)
remote_U = self.push(U.empty())
del U
self.map(_append_array_slice, slices, U=remote_U)
return remote_U

def scatter_list(self, l):
slice_len = len(l) // len(self) + (1 if len(l) % len(self) else 0)
slices = []
for i in range(len(self)):
slices.append(l[i*slice_len:(i+1)*slice_len])
del l
remote_l = self.push([])
self.map(_append_list_slice, slices, l=remote_l)
return remote_l


def _append_array_slice(s, U=None):
U.append(s, remove_from_other=True)


def _append_list_slice(s, l=None):
l.extend(s)
63 changes: 63 additions & 0 deletions src/pymor/parallel/dummy.py
@@ -0,0 +1,63 @@
# This file is part of the pyMOR project (http://www.pymor.org).
# Copyright Holders: Rene Milk, Stephan Rave, Felix Schindler
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)

from __future__ import absolute_import, division, print_function

from itertools import izip

from pymor.core.interfaces import ImmutableInterface
from pymor.core.pickle import dumps, loads
from pymor.parallel.interfaces import WorkerPoolInterface, RemoteObjectInterface


class DummyPool(WorkerPoolInterface):

def __len__(self):
return 1

def push(self, obj):
if isinstance(obj, ImmutableInterface):
return DummyRemoteObject(obj)
else:
return DummyRemoteObject(loads(dumps(obj))) # ensure we make a deep copy of the data

def scatter_array(self, U, copy=True):
if copy:
U = U.copy()
return DummyRemoteObject(U)

def scatter_list(self, l):
l = list(l)
return DummyRemoteObject(l)

def _map_kwargs(self, kwargs):
return {k: (v.obj if isinstance(v, DummyRemoteObject) else v) for k, v in kwargs.iteritems()}

def apply(self, function, *args, **kwargs):
kwargs = self._map_kwargs(kwargs)
return [function(*args, **kwargs)]

def apply_only(self, function, worker, *args, **kwargs):
kwargs = self._map_kwargs(kwargs)
return function(*args, **kwargs)

def map(self, function, *args, **kwargs):
kwargs = self._map_kwargs(kwargs)
result = [function(*a, **kwargs) for a in izip(*args)]
if isinstance(result[0], tuple):
return zip(*result)
else:
return result


dummy_pool = DummyPool()


class DummyRemoteObject(RemoteObjectInterface):

def __init__(self, obj):
self.obj = obj

def _remove(self):
del self.obj