Skip to content

Commit

Permalink
MeterGroup.dataframe_of_meters totally re-written to lay foundation
Browse files Browse the repository at this point in the history
for nice way to plot stacked area graph #201. Closes #138.
  • Loading branch information
JackKelly committed Dec 11, 2014
1 parent 3b1755b commit fb62cf7
Showing 1 changed file with 74 additions and 56 deletions.
130 changes: 74 additions & 56 deletions nilmtk/metergroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sys import stdout
from collections import Counter
import matplotlib.pyplot as plt
from copy import deepcopy
from .elecmeter import ElecMeter, ElecMeterID
from .appliance import Appliance
from .datastore.datastore import join_key
Expand Down Expand Up @@ -580,27 +581,33 @@ def load(self, sample_period=None, **kwargs):
warn("If you are using `preprocessing` to resample then please"
" do not! Instead, please use the `sample_period` parameter.")

# Get a list of generators
if sample_period is None:
sample_period = self.sample_period()

# Load each generator and yield the sum or the mean
_, generators = self._meter_generators(**kwargs)
while True:
chunk = combine_chunks_from_generators(generators, sample_period)
if chunk.empty:
break
yield chunk

def _meter_generators(self, **kwargs):
"""Returns (list of identifiers, list of generators)."""
generators = []
identifiers = []
for meter in self.meters:
try:
generator = meter.load(**kwargs)
generator = meter.load(**deepcopy(kwargs))
except MeasurementError as e:
warn("Ignoring meter '{}' because it does not have the correct"
" measurements. The MeasurementError was: '{}'"
.format(meter.identifier, e))
else:
generators.append(generator)
identifiers.append(meter.identifier)

if sample_period is None:
sample_period = self.sample_period()

# Load each generator and yield the sum or the mean
while True:
chunk = combine_chunks_from_generators(generators, sample_period)
if chunk.empty:
break
yield chunk
return identifiers, generators

def plot_when_on(self, **load_kwargs):
meter_identifiers = list(self.identifier)
Expand Down Expand Up @@ -811,62 +818,73 @@ def good_sections(self, **kwargs):
else:
return []

def dataframe_of_meters(self, rule='1T'):
def dataframe_of_meters(self, sample_period=None, **kwargs):
"""
Parameters
----------
sample_period : number
Number of seconds to reindex meters to.
ac_type : string, defaults to 'best'
physical_quantity: string, defaults to 'power'
Returns
-------
DataFrame
Each column is a meter. We select the most appropriate measurement.
First column is mains.
All NaNs are zeroed out.
Chunks (which may not be consecutive) are put together. i.e. there
will be breaks in the index where there were holes in the mains data.
Note
----
* we use 'meters_directly_downsteam_of_mains' instead of most distal meters
* think this was written when rushing to get disaggregation to
work with NILMTK v0.2. Might not need this function once we teach
disaggregators to handle individual appliances, chunk by chunk.
All NaNs are zeroed out. Note that column names are string
representations of ElecMeterIDs (because 'pd.concat' tries to use
tuples to construct a multiindex)
"""
submeters_dict = {}
mains = self.mains()
mains_good_sections = mains.good_sections()
mains_energy = mains.total_energy(sections=mains_good_sections)
energy_ac_type = select_best_ac_type(mains_energy.keys())
energy_threshold = mains_energy[energy_ac_type] * 0.05

# TODO: should iterate through 'most distal' meters
for meter in [self.mains()] + self.meters_directly_downstream_of_mains():
meter_energy = meter.total_energy(sections=mains_good_sections)
meter_energy_ac_type = select_best_ac_type(meter_energy.keys(),
mains_energy.keys())
if meter_energy[meter_energy_ac_type] < energy_threshold:
continue

# TODO: resampling etc should happen in pipeline
chunks = []
for chunk in meter.power_series(sections=mains_good_sections):
chunk = chunk.resample(rule=rule, how='mean')
if kwargs.has_key('preprocessing'):
warn("If you are using `preprocessing` to resample then please"
" do not! Instead, please use the `sample_period` parameter.")

# Protect against getting duplicate indicies
if chunks and chunks[-1].index[-1] == chunk.index[0]:
chunks.append(chunk.iloc[1:])
if sample_period is None:
sample_period = self.sample_period()

resample = lambda df: df.resample(rule='{}S'.format(sample_period))
kwargs.setdefault('preprocessing', []).append(Apply(func=resample))
kwargs.setdefault('ac_type', 'best')
kwargs.setdefault('physical_quantity', 'power')
identifiers, generators = self._meter_generators(**kwargs)

segments = []
while True:
chunks = []
ids = []
index = None
timeframe = None
for meter_id, generator in zip(identifiers, generators):
try:
chunk_from_next_meter = next(generator)
except StopIteration:
continue

if timeframe is None:
timeframe = chunk_from_next_meter.timeframe
else:
chunks.append(chunk)
timeframe = timeframe.union(chunk_from_next_meter.timeframe)

power_series = pd.concat(chunks)
# Extend 'index' if necessary
index = extend_index(index, timeframe, sample_period)

# need to make sure
# resample stays in sync with mains power_series. Maybe want reindex???
# If we're careful then I think we can get power_series with index
# in common with mains, without having to post-process it
# like prepb.make_common_index(building)
# Reindex chunk_from_next_meter
chunk_from_next_meter = chunk_from_next_meter.reindex(
index, method='ffill', limit=1, fill_value=0)

ids.append(meter_id)
chunks.append(chunk_from_next_meter.icol(0))

if chunks:
df = pd.concat(chunks, axis=1)
df.columns = ids
segments.append(df)
else:
break

# TODO: insert zeros and then ffill
power_series.fillna(value=0, inplace=True)
submeters_dict[meter.identifier] = power_series
return pd.DataFrame(submeters_dict)
return pd.concat(segments)

def entropy_per_meter(self):
"""Finds the entropy of each meter in this MeterGroup.
Expand Down

0 comments on commit fb62cf7

Please sign in to comment.