Skip to content

Commit

Permalink
Consistant chunksize handling (markovmodel#848)
Browse files Browse the repository at this point in the history
* respect chunksize in iterator creation
* [coordinates/api] set chunksize in param_stage for input stage
  Only set it in case the user has overriden it, otherwise take default chunksize from Iterable.
* [Iterable] set default chunk size to None and use self.chunksize as default then.
* [FeatureReader] provide correct setter for chunksize

Fixes markovmodel#846
  • Loading branch information
marscher committed Jun 30, 2016
1 parent dece1c6 commit 582e923
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 13 deletions.
7 changes: 7 additions & 0 deletions doc/source/CHANGELOG.rst
@@ -1,6 +1,13 @@
Changelog
=========

2.2.2 ()
--------

*Fixes**:

- coordinates: set chunksize correctly. #846

2.2.1 (6-21-16)
---------------

Expand Down
2 changes: 1 addition & 1 deletion doc/source/index.rst
Expand Up @@ -33,7 +33,7 @@ Technical features:
* Subscribe to the newsletter `here <https://lists.fu-berlin.de/listinfo/pyemma-newsletter#subscribe>`_. News will be sent only after major releases / fixes.

Citation |DOI for Citing PyEMMA|
--------------------------------
================================

If you use PyEMMA in scientific software, please cite the following paper: ::

Expand Down
8 changes: 3 additions & 5 deletions pyemma/coordinates/api.py
Expand Up @@ -812,7 +812,7 @@ def _get_input_stage(previous_stage):
return inputstage


def _param_stage(previous_stage, this_stage, stride=1, chunk_size=0):
def _param_stage(previous_stage, this_stage, stride=1, chunk_size=None):
r""" Parametrizes the given pipelining stage if a valid source is given.
Parameters
Expand All @@ -829,12 +829,10 @@ def _param_stage(previous_stage, this_stage, stride=1, chunk_size=0):
return this_stage

input_stage = _get_input_stage(previous_stage)
input_stage.chunksize = chunk_size
assert input_stage.default_chunksize == chunk_size
if chunk_size is not None:
input_stage.chunksize = chunk_size
# parametrize transformer
this_stage.data_producer = input_stage
this_stage.chunksize = input_stage.chunksize
assert this_stage.chunksize == chunk_size
this_stage.estimate(X=input_stage, stride=stride)
return this_stage

Expand Down
2 changes: 1 addition & 1 deletion pyemma/coordinates/clustering/kmeans.py
Expand Up @@ -138,7 +138,7 @@ def kmeanspp_center_assigned(self):
def _estimate(self, iterable, **kw):
self._init_estimate()

with iterable.iterator(return_trajindex=True, stride=self.stride) as iter:
with iterable.iterator(return_trajindex=True, stride=self.stride, chunk=self.chunksize) as iter:
# first pass: gather data and run k-means
first_chunk = True
for itraj, X in iter:
Expand Down
2 changes: 1 addition & 1 deletion pyemma/coordinates/clustering/regspace.py
Expand Up @@ -135,7 +135,7 @@ def _estimate(self, iterable, **kwargs):
it = iterable.iterator(return_trajindex=False)
used_frames = 0
try:
with iterable.iterator(return_trajindex=False) as it:
with iterable.iterator(return_trajindex=False, stride=self.stride, chunk=self.chunksize) as it:
for X in it:
used_frames += len(X)
regspatial.cluster(X.astype(np.float32, order='C', copy=False),
Expand Down
2 changes: 1 addition & 1 deletion pyemma/coordinates/clustering/uniform_time.py
Expand Up @@ -84,7 +84,7 @@ def _estimate(self, iterable, **kw):
linspace = self.stride * np.arange(next_t, T - next_t + 1, (T - 2*next_t + 1) // self.n_clusters)[:self.n_clusters]
# random access matrix
ra_stride = np.array([UniformTimeClustering._idx_to_traj_idx(x, cumsum) for x in linspace])
with iterable.iterator(stride=ra_stride, return_trajindex=False) as it:
with iterable.iterator(stride=ra_stride, return_trajindex=False, chunk=self.chunksize) as it:
self.clustercenters = np.concatenate([X for X in it])

assert len(self.clustercenters) == self.n_clusters
Expand Down
5 changes: 4 additions & 1 deletion pyemma/coordinates/data/_base/iterable.py
Expand Up @@ -106,7 +106,7 @@ def iterator(self, stride=1, lag=0, chunk=None, return_trajindex=True, cols=None
return LaggedIterator(it, it_lagged, return_trajindex)
return it

def get_output(self, dimensions=slice(0, None), stride=1, skip=0, chunk=0):
def get_output(self, dimensions=slice(0, None), stride=1, skip=0, chunk=None):
if isinstance(dimensions, int):
ndim = 1
dimensions = slice(dimensions, dimensions + 1)
Expand All @@ -119,6 +119,9 @@ def get_output(self, dimensions=slice(0, None), stride=1, skip=0, chunk=0):

assert ndim > 0, "ndim was zero in %s" % self.__class__.__name__

if chunk is None:
chunk = self.chunksize

# create iterator
if self.in_memory and not self._mapping_to_mem_active:
from pyemma.coordinates.data.data_in_memory import DataInMemory
Expand Down
4 changes: 4 additions & 0 deletions pyemma/coordinates/data/feature_reader.py
Expand Up @@ -305,6 +305,10 @@ def __init__(self, data_source, skip=0, chunk=0, stride=1, return_trajindex=Fals
#self._cols = cols
self._create_mditer()

@DataSourceIterator.chunksize.setter
def chunksize(self, val):
self._mditer._chunksize = int(val)

def close(self):
if hasattr(self, '_mditer') and self._mditer is not None:
self._mditer.close()
Expand Down
2 changes: 1 addition & 1 deletion pyemma/coordinates/transform/pca.py
Expand Up @@ -198,7 +198,7 @@ def _diagonalize(self):
def _estimate(self, iterable, **kw):
partial_fit = 'partial' in kw

with iterable.iterator(return_trajindex=False) as it:
with iterable.iterator(return_trajindex=False, chunk=self.chunksize) as it:
n_chunks = it._n_chunks
self._progress_register(n_chunks, "calc mean+cov", 0)
self._init_covar(partial_fit, n_chunks)
Expand Down
2 changes: 1 addition & 1 deletion pyemma/coordinates/transform/tica.py
Expand Up @@ -261,7 +261,7 @@ def _estimate(self, iterable, **kw):
raise ValueError("None single dataset [longest=%i] is longer than"
" lag time [%i]." % (max(iterable.trajectory_lengths(self.stride)), self.lag))

it = iterable.iterator(lag=self.lag, return_trajindex=False)
it = iterable.iterator(lag=self.lag, return_trajindex=False, chunk=self.chunksize)
with it:
self._progress_register(it._n_chunks, "calculate mean+cov", 0)
self._init_covar(partial_fit, it._n_chunks)
Expand Down
2 changes: 1 addition & 1 deletion pyemma/coordinates/transform/transformer.py
Expand Up @@ -198,7 +198,7 @@ def estimate(self, X, **kwargs):

return self

def get_output(self, dimensions=slice(0, None), stride=1, skip=0, chunk=0):
def get_output(self, dimensions=slice(0, None), stride=1, skip=0, chunk=None):
if not self._estimated:
self.estimate(self.data_producer, stride=stride)

Expand Down

0 comments on commit 582e923

Please sign in to comment.