Skip to content

Commit

Permalink
Reduce redundancy between metergroup.combine_chunks_from_generators
Browse files Browse the repository at this point in the history
and MeterGroup.dataframe_of_meters. Closes #291. Also:

* `load` and `dataframe_of_meters` both now take a `resample`
  parameter to optionally resample before reindexing.  Defaults to
  False.
* Replaced all `has_key` with `in` in `metergroup.py`. #286
* renamed `_check_kwargs` to the more descriptive name
  `_check_kwargs_for_full_results_and_sections`
* made some docstrings conform better to Numpy docstring standard.
  • Loading branch information
JackKelly committed Dec 12, 2014
1 parent 5cce664 commit 68d56b3
Showing 1 changed file with 68 additions and 67 deletions.
135 changes: 68 additions & 67 deletions nilmtk/metergroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ def draw_wiring_graph(self):
labels[meter] = meter_instances
nx.draw(graph, labels=labels)

def load(self, sample_period=None, **kwargs):
def load(self, **kwargs):
"""Returns a generator of DataFrames loaded from the DataStore.
By default, `load` will load all available columns from the DataStore.
Expand All @@ -538,16 +538,18 @@ def load(self, sample_period=None, **kwargs):
Parameters
----------
sample_period : number, seconds, optional
The sample_period to reindex all meters to. If not specified then
will use the max of all meters' sample_periods.
sample_period : int or float, optional
Number of seconds to use as sample period when reindexing meters.
If not specified then will use the max of all meters' sample_periods.
resample : bool, defaults to False
If True then resample to `sample_period` before reindexing.
**kwargs :
any other key word arguments to pass to `self.store.load()` including:
physical_quantity : string or list of strings
e.g. 'power' or 'voltage' or 'energy' or ['power', 'energy'].
If a single string then load columns only for that physical quantity.
If a list of strings then load columns for all those physical
quantities.
ac_type : string or list of strings, defaults to None
Where 'ac_type' is short for 'alternating current type'. e.g.
'reactive' or 'active' or 'apparent'.
Expand All @@ -558,32 +560,23 @@ def load(self, sample_period=None, **kwargs):
physical quantity, else raise an Exception.
If set to a list of AC type strings then will load all those
AC types and will raise an Exception if any cannot be found.
cols : list of tuples, using NILMTK's vocabulary for measurements.
e.g. [('power', 'active'), ('voltage', ''), ('energy', 'reactive')]
`cols` can't be used if `ac_type` and/or `physical_quantity` are set.
preprocessing : list of Node subclass instances
e.g. [Clip()]
**kwargs : any other key word arguments to pass to `self.store.load()`
Returns
---------
Always return a generator of DataFrames (even if it only has a single
column).
.. note:: Different AC types will be treated separately.
"""
if kwargs.has_key('preprocessing'):
warn("If you are using `preprocessing` to resample then please"
" do not! Instead, please use the `sample_period` parameter.")

if sample_period is None:
sample_period = self.sample_period()
sample_period, kwargs = self._prep_kwargs_for_sample_period_and_resample(**kwargs)

# Load each generator and yield the sum or the mean
_, generators = self._meter_generators(**kwargs)
identifiers, generators = self._meter_generators(**kwargs)
while True:
chunk = combine_chunks_from_generators(generators, sample_period)
if chunk.empty:
Expand All @@ -595,8 +588,9 @@ def _meter_generators(self, **kwargs):
generators = []
identifiers = []
for meter in self.meters:
kwargs_copy = deepcopy(kwargs)
try:
generator = meter.load(**deepcopy(kwargs))
generator = meter.load(**kwargs_copy)
except MeasurementError as e:
warn("Ignoring meter '{}' because it does not have the correct"
" measurements. The MeasurementError was: '{}'"
Expand Down Expand Up @@ -732,7 +726,7 @@ def total_energy(self, **load_kwargs):
if `full_results` is True then return TotalEnergyResults object
else return a pd.Series with a row for each AC type.
"""
self._check_kwargs(load_kwargs)
self._check_kwargs_for_full_results_and_sections(load_kwargs)
full_results = load_kwargs.pop('full_results', False)

meter_energies = self._collect_stats_on_all_meters(
Expand Down Expand Up @@ -776,7 +770,7 @@ def dropout_rate(self, **load_kwargs):
else return either a single number of, if there are multiple
AC types, then return a pd.Series with a row for each AC type.
"""
self._check_kwargs(load_kwargs)
self._check_kwargs_for_full_results_and_sections(load_kwargs)
full_results = load_kwargs.pop('full_results', False)

dropout_rates = self._collect_stats_on_all_meters(
Expand All @@ -790,9 +784,9 @@ def dropout_rate(self, **load_kwargs):
else:
return np.mean(dropout_rates)

def _check_kwargs(self, load_kwargs):
if (load_kwargs.get('full_results ')
and not load_kwargs.has_key('sections')
def _check_kwargs_for_full_results_and_sections(self, load_kwargs):
if (load_kwargs.get('full_results')
and not 'sections' in load_kwargs
and len(self.meters) > 1):
raise RuntimeError("MeterGroup stats can only return full results"
" objects if you specify 'sections' to load. If"
Expand All @@ -816,15 +810,36 @@ def good_sections(self, **kwargs):
else:
return []

def dataframe_of_meters(self, sample_period=None, **kwargs):
def _prep_kwargs_for_sample_period_and_resample(self, sample_period=None,
resample=False, **kwargs):
if 'preprocessing' in kwargs:
warn("If you are using `preprocessing` to resample then please"
" do not! Instead, please use the `sample_period` parameter"
" and set `resample=True`.")

if sample_period is None:
sample_period = self.sample_period()

if resample:
resample_func = lambda df: df.resample(rule='{}S'.format(sample_period))
kwargs.setdefault('preprocessing', []).append(Apply(func=resample_func))
kwargs.setdefault('ac_type', 'best')
kwargs.setdefault('physical_quantity', 'power')

return sample_period, kwargs

def dataframe_of_meters(self, **kwargs):
"""
Parameters
----------
sample_period : number
Number of seconds to reindex meters to.
sample_period : int or float, optional
Number of seconds to use as sample period when reindexing meters.
If not specified then will use the max of all meters' sample_periods.
resample : bool, defaults to False
If True then resample to `sample_period` before reindexing.
**kwargs :
any other key word arguments to pass to `self.store.load()` including:
ac_type : string, defaults to 'best'
physical_quantity: string, defaults to 'power'
Returns
Expand All @@ -835,19 +850,8 @@ def dataframe_of_meters(self, sample_period=None, **kwargs):
representations of ElecMeterIDs (because 'pd.concat' tries to use
tuples to construct a multiindex)
"""
if kwargs.has_key('preprocessing'):
warn("If you are using `preprocessing` to resample then please"
" do not! Instead, please use the `sample_period` parameter.")

if sample_period is None:
sample_period = self.sample_period()

resample = lambda df: df.resample(rule='{}S'.format(sample_period))
kwargs.setdefault('preprocessing', []).append(Apply(func=resample))
kwargs.setdefault('ac_type', 'best')
kwargs.setdefault('physical_quantity', 'power')
sample_period, kwargs = self._prep_kwargs_for_sample_period_and_resample(**kwargs)
identifiers, generators = self._meter_generators(**kwargs)

segments = []
while True:
chunks = []
Expand All @@ -856,22 +860,11 @@ def dataframe_of_meters(self, sample_period=None, **kwargs):
timeframe = None
for meter_id, generator in zip(identifiers, generators):
try:
chunk_from_next_meter = next(generator)
chunk_from_next_meter, timeframe, index = _load_and_reindex_chunk(
generator, timeframe, index)
except StopIteration:
continue

if timeframe is None:
timeframe = chunk_from_next_meter.timeframe
else:
timeframe = timeframe.union(chunk_from_next_meter.timeframe)

# Extend 'index' if necessary
index = extend_index(index, timeframe, sample_period)

# Reindex chunk_from_next_meter
chunk_from_next_meter = chunk_from_next_meter.reindex(
index, method='ffill', limit=1, fill_value=0)

ids.append(meter_id)
chunks.append(chunk_from_next_meter.icol(0))

Expand Down Expand Up @@ -1304,7 +1297,8 @@ def combine_chunks_from_generators(generators, sample_period):
Parameters
----------
generators : list of generators of nilmtk DataFrames
sample_period : number, seconds
sample_period : int or float
Number of seconds to use as sample period when reindexing meters.
Returns
-------
Expand All @@ -1324,22 +1318,11 @@ def combine_chunks_from_generators(generators, sample_period):
index = None
for generator in generators:
try:
chunk_from_next_meter = next(generator)
chunk_from_next_meter, timeframe, index = _load_and_reindex_chunk(
generator, timeframe, index)
except StopIteration:
continue

if timeframe is None:
timeframe = chunk_from_next_meter.timeframe
else:
timeframe = timeframe.union(chunk_from_next_meter.timeframe)

# Extend 'index' if necessary
index = extend_index(index, timeframe, sample_period)

# Reindex chunk_from_next_meter
chunk_from_next_meter = chunk_from_next_meter.reindex(
index, method='ffill', limit=1, fill_value=0)

# Add
try:
chunk = chunk.add(chunk_from_next_meter, fill_value=0)
Expand Down Expand Up @@ -1367,6 +1350,24 @@ def combine_chunks_from_generators(generators, sample_period):
return chunk


def _load_and_reindex_chunk(generator, timeframe, index):
chunk_from_next_meter = next(generator)

if timeframe is None:
timeframe = chunk_from_next_meter.timeframe
else:
timeframe = timeframe.union(chunk_from_next_meter.timeframe)

# Extend 'index' if necessary
index = extend_index(index, timeframe, sample_period)

# Reindex chunk_from_next_meter
chunk_from_next_meter = chunk_from_next_meter.reindex(
index, method='ffill', limit=1, fill_value=0)

return chunk_from_next_meter, timeframe, index


def extend_index(index, timeframe, sample_period):
"""Extends `index` to size of timeframe, ensuring that we maintain
a regular interval between each index element.
Expand Down

0 comments on commit 68d56b3

Please sign in to comment.