-
Notifications
You must be signed in to change notification settings - Fork 455
/
goodsections.py
178 lines (149 loc) · 6.14 KB
/
goodsections.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from __future__ import print_function, division
import numpy as np
from numpy import diff, concatenate
import gc
from .goodsectionsresults import GoodSectionsResults
from ..timeframe import TimeFrame
from ..utils import timedelta64_to_secs
from ..node import Node
from ..timeframe import list_of_timeframes_from_list_of_dicts, timeframe_from_dict
class GoodSections(Node):
"""Locate sections of data where the sample period is <= max_sample_period.
Attributes
----------
previous_chunk_ended_with_open_ended_good_section : bool
"""
requirements = {'device': {'max_sample_period': 'ANY VALUE'}}
postconditions = {'statistics': {'good_sections': []}}
results_class = GoodSectionsResults
def reset(self):
self.previous_chunk_ended_with_open_ended_good_section = False
def process(self):
metadata = self.upstream.get_metadata()
self.check_requirements()
self.results = GoodSectionsResults(
metadata['device']['max_sample_period'])
for chunk in self.upstream.process():
self._process_chunk(chunk, metadata)
yield chunk
def _process_chunk(self, df, metadata):
"""
Parameters
----------
df : pd.DataFrame
with attributes:
- look_ahead : pd.DataFrame
- timeframe : nilmtk.TimeFrame
metadata : dict
with ['device']['max_sample_period'] attribute
Returns
-------
None
Notes
-----
Updates `self.results`
Each good section in `df` is marked with a TimeFrame.
If this df ends with an open-ended good section (assessed by
examining df.look_ahead) then the last TimeFrame will have
`end=None`. If this df starts with an open-ended good section
then the first TimeFrame will have `start=None`.
"""
# Retrieve relevant metadata
max_sample_period = metadata['device']['max_sample_period']
look_ahead = getattr(df, 'look_ahead', None)
timeframe = df.timeframe
# Process dataframe
good_sections = get_good_sections(
df, max_sample_period, look_ahead,
self.previous_chunk_ended_with_open_ended_good_section)
# Set self.previous_chunk_ended_with_open_ended_good_section
if good_sections:
self.previous_chunk_ended_with_open_ended_good_section = (
good_sections[-1].end is None)
# Update self.results
self.results.append(timeframe, {'sections': [good_sections]})
def get_good_sections(df, max_sample_period, look_ahead=None,
previous_chunk_ended_with_open_ended_good_section=False):
"""
Parameters
----------
df : pd.DataFrame
look_ahead : pd.DataFrame
max_sample_period : number
Returns
-------
sections : list of TimeFrame objects
Each good section in `df` is marked with a TimeFrame.
If this df ends with an open-ended good section (assessed by
examining `look_ahead`) then the last TimeFrame will have
`end=None`. If this df starts with an open-ended good section
then the first TimeFrame will have `start=None`.
"""
index = df.dropna().sort_index().index
del df
if len(index) < 2:
return []
timedeltas_sec = timedelta64_to_secs(diff(index.values))
timedeltas_check = timedeltas_sec <= max_sample_period
# Memory management
del timedeltas_sec
gc.collect()
timedeltas_check = concatenate(
[[previous_chunk_ended_with_open_ended_good_section],
timedeltas_check])
transitions = diff(timedeltas_check.astype(np.int))
# Memory management
last_timedeltas_check = timedeltas_check[-1]
del timedeltas_check
gc.collect()
good_sect_starts = list(index[:-1][transitions == 1])
good_sect_ends = list(index[:-1][transitions == -1])
# Memory management
last_index = index[-1]
del index
gc.collect()
# Use look_ahead to see if we need to append a
# good sect start or good sect end.
look_ahead_valid = look_ahead is not None and not look_ahead.empty
if look_ahead_valid:
look_ahead_timedelta = look_ahead.dropna().index[0] - last_index
look_ahead_gap = look_ahead_timedelta.total_seconds()
if last_timedeltas_check: # current chunk ends with a good section
if not look_ahead_valid or look_ahead_gap > max_sample_period:
# current chunk ends with a good section which needs to
# be closed because next chunk either does not exist
# or starts with a sample which is more than max_sample_period
# away from df.index[-1]
good_sect_ends += [last_index]
elif look_ahead_valid and look_ahead_gap <= max_sample_period:
# Current chunk appears to end with a bad section
# but last sample is the start of a good section
good_sect_starts += [last_index]
# Work out if this chunk ends with an open ended good section
if len(good_sect_ends) == 0:
ends_with_open_ended_good_section = (
len(good_sect_starts) > 0 or
previous_chunk_ended_with_open_ended_good_section)
elif len(good_sect_starts) > 0:
# We have good_sect_ends and good_sect_starts
ends_with_open_ended_good_section = (
good_sect_ends[-1] < good_sect_starts[-1])
else:
# We have good_sect_ends but no good_sect_starts
ends_with_open_ended_good_section = False
# If this chunk starts or ends with an open-ended
# good section then the relevant TimeFrame needs to have
# a None as the start or end.
if previous_chunk_ended_with_open_ended_good_section:
good_sect_starts = [None] + good_sect_starts
if ends_with_open_ended_good_section:
good_sect_ends += [None]
assert len(good_sect_starts) == len(good_sect_ends)
sections = [TimeFrame(start, end)
for start, end in zip(good_sect_starts, good_sect_ends)
if not (start == end and start is not None)]
# Memory management
del good_sect_starts
del good_sect_ends
gc.collect()
return sections