Skip to content

Commit

Permalink
making good progress in removing Pipeline for a more elegant design.
Browse files Browse the repository at this point in the history
Still some work to do.  The disaggregation!
  • Loading branch information
JackKelly committed Jul 3, 2014
1 parent bb69147 commit 462237e
Show file tree
Hide file tree
Showing 15 changed files with 355 additions and 202 deletions.
4 changes: 2 additions & 2 deletions nilmtk/datastore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import print_function, division
import pandas as pd
from nilmtk.timeframe import TimeFrame
from .timeframe import TimeFrame
from .node import Node

MAX_MEM_ALLOWANCE_IN_BYTES = 1E9

Expand Down Expand Up @@ -348,7 +349,6 @@ def _check_key(self, key):
raise KeyError(key + ' not in store')



def join_key(*args):
"""
Examples
Expand Down
61 changes: 36 additions & 25 deletions nilmtk/elecmeter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import print_function, division
from .pipeline import Pipeline, Clip, TotalEnergy, GoodSections
from .preprocessing import Clip
from .stats import TotalEnergy, GoodSections
from .hashable import Hashable
from .appliance import Appliance
from .datastore import Key
from .measurement import select_best_ac_type
from .node import Node
from warnings import warn
from collections import namedtuple
from copy import deepcopy
Expand All @@ -25,7 +27,7 @@ class ElecMeter(Hashable):
key into nilmtk.DataStore to access data.
metadata : dict.
See http://nilm-metadata.readthedocs.org/en/latest/dataset_metadata.html#elecmeter
See http://nilm-metadata.readthedocs.org/en/latest/dataset_metadata.html#elecmeter
STATIC ATTRIBUTES
-----------------
Expand Down Expand Up @@ -55,6 +57,7 @@ def __init__(self, store=None, metadata=None, meter_id=None):
assert isinstance(meter_id, ElecMeterID)
ElecMeter.meters[self.identifier] = self
self.appliances = []
self.loader_kwargs = {}

@property
def key(self):
Expand Down Expand Up @@ -154,7 +157,7 @@ def __repr__(self):
string = super(ElecMeter, self).__repr__()
# Now add list of appliances...
string = string[:-1] # remove last bracket
string += ', appliances={}'.format(self.appliances)
string += '(appliances={}'.format(self.appliances)

# METER ROOM
room = self.metadata.get('room')
Expand Down Expand Up @@ -222,30 +225,43 @@ def power_series(self, measurement_ac_type_prefs=None, **load_kwargs):
def voltage_series(self):
"""Returns a generator of pd.Series of voltage, if available."""
raise NotImplementedError

def dry_run_metadata(self):
return self.metadata

def get_metadata(self):
return self.metadata

def get_source_node(self):
generator = self.store.load(key=self.key, **self.loader_kwargs)
return Node(self, generator=generator)

def total_energy(self, **load_kwargs):
def total_energy(self, **loader_kwargs):
"""
Returns
-------
nilmtk.pipeline.EnergyResults object
nilmtk.stats.TotalEnergyResults object
"""
nodes = [Clip(), TotalEnergy()]
results = self._run_pipeline(nodes, **load_kwargs)
return results['energy']
source_node = self.get_source_node(**loader_kwargs)
clipped = Clip(source_node)
total_energy = TotalEnergy(clipped)
total_energy.run()
return total_energy.results

def dropout_rate(self):
"""returns a DropoutRateResults object."""
raise NotImplementedError

def good_sections(self):
def good_sections(self, **loader_kwargs):
"""
Returns
-------
sections: list of nilmtk.TimeFrame objects
"""
nodes = [GoodSections()]
results = self._run_pipeline(nodes)
return results['good_sections']
source_node = self.get_source_node(**loader_kwargs)
good_sections = GoodSections(source_node)
good_sections.run()
return good_sections.results.combined

def total_on_duration(self):
"""Return timedelta"""
Expand Down Expand Up @@ -273,10 +289,14 @@ def discrete_appliance_activations(self):
raise NotImplementedError

def proportion_of_energy(self, mains):
# Mask out gaps from mains
good_mains_timeframes = mains.good_timeframes()
proportion_of_energy = (self.total_energy(timeframes=good_mains_timeframes) /
mains.total_energy(timeframes=good_mains_timeframes))
"""
Parameters
----------
mains : nilmtk.ElecMeter or MeterGroup
"""
mains_good_sects = mains.good_sections()
proportion_of_energy = (self.total_energy(timeframes=mains_good_sects) /
mains.total_energy(timeframes=mains_good_sects))
return proportion_of_energy

def contiguous_sections(self):
Expand All @@ -289,12 +309,3 @@ def clean_and_export(self, destination_datastore):
cleaning steps have been executed and some summary results (e.g. the number of
implausible values removed)"""
raise NotImplementedError

def _run_pipeline(self, nodes, **load_kwargs):
if self.store is None:
msg = ("'meter.store' is not set!"
" Cannot process data without a DataStore!")
raise RuntimeError(msg)
pipeline = Pipeline(nodes)
pipeline.run(meter=self, **load_kwargs)
return pipeline.results
194 changes: 86 additions & 108 deletions nilmtk/node.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,93 @@
import abc
from copy import deepcopy
from nilm_metadata import recursively_update_dict

class Node(object):
"""Abstract class defining interface for all Node subclasses,
where a 'node' is a module which runs pre-processing or statistics
(or, later, maybe NILM training or disaggregation).
"""

requirements = {}
postconditions = {}

def __init__(self, upstream, generator=None):
"""
Parameters
----------
upstream : an ElecMeter or MeterGroup or a Node subclass
generator : Python generator. Optional
"""
self.upstream = upstream
self.generator = generator
self.results = None
self.reset()

def reset(self):
pass # Overridden by each subclass that needs reset logic

def process(self):
return self.generator

def run(self):
"""Pulls data through the pipeline. Useful if we just want to calculate
some stats."""
for _ in self.process():
pass

def check_requirements(self):
"""Checks that `self.upstream.dry_run_metadata` satisfies `self.requirements`.
Raises
------
UnsatistfiedRequirementsError
"""
# If a subclass has complex rules for preconditions then
# override this method.
unsatisfied = find_unsatisfied_requirements(self.upstream.dry_run_metadata(),
self.requirements)
if unsatisfied:
msg = str(self) + " not satisfied by:\n" + str(unsatisfied)
raise UnsatisfiedRequirementsError(msg)

def dry_run_metadata(self):
"""Does a 'dry run' so we can validate the full pipeline before
loading any data.
Returns
-------
dict : dry run metadata
"""
state = deepcopy(self.__class__.postconditions)
recursively_update_dict(state, self.upstream.dry_run_metadata())
return state

def get_metadata(self):
if self.results:
metadata = deepcopy(self.upstream.get_metadata())
results_dict = self.results.to_dict()
recursively_update_dict(metadata, results_dict)
else:
# Don't bother to deepcopy upstream's metadata if
# we aren't going to modify it.
metadata = self.upstream.get_metadata()
return metadata

def required_measurements(self, state):
"""
Returns
-------
Set of measurements that need to be loaded from disk for this node.
"""
return set()

def __repr__(self):
return self.__class__.__name__ + ' ' + self.name


class UnsatisfiedRequirementsError(Exception):
pass


def find_unsatisfied_requirements(state, requirements):
"""
Parameters
Expand Down Expand Up @@ -38,110 +123,3 @@ def unsatisfied_requirements(st, req):
unsatisfied_requirements(state, requirements)

return unsatisfied


class Node(object):
"""Abstract class defining interface for all Node subclasses,
where a 'node' is a module which runs pre-processing or statistics
(or, later, maybe NILM training or disaggregation).
"""

__metaclass__ = abc.ABCMeta

requirements = {}
postconditions = {}

def __init__(self):
self.reset()

def reset(self):
pass # to be overridden by subclasses

def update_state(self, state):
"""Recursively updates `state` dict with `postconditions`.
This function is required because Python's `dict.update()` function
does not descend into dicts within dicts.
Parameters
----------
state : dict
Returns
-------
state : dict
"""
def _update_state(state, postconditions):
# Recursively update dict.
for key, value in postconditions.iteritems():
try:
state_value = state[key]
except KeyError:
state[key] = value
else:
if isinstance(value, dict):
assert isinstance(state_value, dict)
_update_state(state_value, value)
elif isinstance(value, list):
assert isinstance(state_value, list)
state_value.extend(value)
else:
state[key] = value
return state
return _update_state(state, self.postconditions)

def check_requirements(self, state):
"""
Parameters
----------
state : dict
Raises
------
UnsatistfiedPreconditionsError
Description
-----------
Requirements can be of the form:
"node X needs (power.apparent or power.active) (but not
power.reactive) and voltage is useful but not essential"
or
"node Y needs everything available from disk (to save to a copy to
disk)"
or
"ComputeEnergy node needs good sections to be located" (if
none of the previous nodes provide this service then check
source.metadata to see if zeros have already been inserted; if the
haven't then raise an error to tell the user to add a
LocateGoodSectionsNode.)
"""
# If a subclass has complex rules for preconditions then
# override this default method definition.
unsatisfied = find_unsatisfied_requirements(state, self.requirements)
if unsatisfied:
msg = str(self) + " not satisfied by:\n" + str(unsatisfied)
raise UnsatisfiedRequirementsError(msg)

def required_measurements(self, state):
"""
Returns
-------
Set of measurements that need to be loaded from disk for this node.
"""
return set()

def __repr__(self):
return self.__class__.__name__ + ' ' + self.name

@abc.abstractmethod
def process(self, df, metadata):
# check_preconditions again??? (in case this node is not run in
# the context of a Pipeline?)
# do stuff to df
return df
1 change: 1 addition & 0 deletions nilmtk/preprocessing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .clip import Clip
21 changes: 12 additions & 9 deletions nilmtk/preprocessing/clip.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import print_function, division
from node import Node
from warnings import warn
from nilmtk.utils import index_of_column_name
from ..node import Node
from ..utils import index_of_column_name

class Clip(Node):

Expand All @@ -12,15 +12,18 @@ class Clip(Node):
postconditions = {'preprocessing_applied': {'clip': {}}}
name = 'clip'

def process(self, df, metadata):
def process(self):
self.check_requirements()
metadata = self.upstream.get_metadata()
measurements = metadata['device']['measurements']
for measurement in df:
lower, upper = _find_limits(measurement, measurements)
if lower is not None and upper is not None:
icol = index_of_column_name(df, measurement)
df.iloc[:,icol] = df.iloc[:,icol].clip(lower, upper)
for chunk in self.upstream.process():
for measurement in chunk:
lower, upper = _find_limits(measurement, measurements)
if lower is not None and upper is not None:
icol = index_of_column_name(chunk, measurement)
chunk.iloc[:,icol] = chunk.iloc[:,icol].clip(lower, upper)

return df
yield chunk

def _find_limits(measurement, measurements):
"""
Expand Down
2 changes: 1 addition & 1 deletion nilmtk/results.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc
import pandas as pd
import copy
from ..timeframe import TimeFrame
from .timeframe import TimeFrame

class Results(object):
"""Metadata results from each node need to be assigned to a specific
Expand Down
2 changes: 2 additions & 0 deletions nilmtk/stats/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .totalenergy import TotalEnergy
from .goodsections import GoodSections

0 comments on commit 462237e

Please sign in to comment.