-
Notifications
You must be signed in to change notification settings - Fork 62
/
time_series.py
374 lines (310 loc) · 11.7 KB
/
time_series.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# coding=utf-8
"""
© 2014 LinkedIn Corp. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
"""
import numpy
class TimeSeries(object):
def __init__(self, series):
self.timestamps = []
self.values = []
# Clean the time series by removing null values.
for ts in sorted(series):
if series[ts] is not None:
self.timestamps.append(int(ts))
self.values.append(float(series[ts]))
@property
def start(self):
"""
Return the earliest timestamp in the time series.
"""
return min(self.timestamps) if self.timestamps else None
@property
def end(self):
"""
Return the latest timestamp in the time series.
"""
return max(self.timestamps) if self.timestamps else None
@property
def timestamps_ms(self):
"""
Return list of timestamp values in order by milliseconds since epoch.
"""
return map(lambda ts: ts * 1000, self.timestamps)
def __repr__(self):
return 'TimeSeries<start={0}, end={1}>'.format(repr(self.start), repr(self.end))
def __str__(self):
"""
:return string: Return string representation of time series
"""
string_rep = ''
for item in self.iteritems():
string_rep += str(item)
return string_rep
def __nonzero__(self):
return len(self.timestamps) > 0
def __getitem__(self, key):
if key in self.timestamps:
pos = self.timestamps.index(key)
return self.values[pos]
else:
raise ValueError('Timestamp does not exist in TimeSeries object')
def __setitem__(self, key, val):
if key in self.timestamps:
pos = self.timestamps.index(key)
if val is None:
del self.timestamps[pos]
del self.values[pos]
else:
self.values[pos] = val
else:
self.timestamps = sorted(self.timestamps + [key])
pos = self.timestamps.index(key)
self.values.insert(pos, val)
def __delitem__(self, key):
if key in self.timestamps:
pos = self.timestamps.index(key)
del self.timestamps[pos]
del self.values[pos]
def __contains__(self, item):
return item in self.timestamps
def __iter__(self):
for key in self.timestamps:
yield key
def __len__(self):
return len(self.timestamps)
def __eq__(self, other):
if len(self.timestamps) != len(other.timestamps):
return False
for pos, ts in enumerate(self.timestamps):
if ts != other.timestamps[pos] or self.values[pos] != other.values[pos]:
return False
else:
return True
def __add__(self, other):
return self._generic_binary_op(other, self._get_value_type(other).__add__)
def __sub__(self, other):
return self._generic_binary_op(other, self._get_value_type(other).__sub__)
def __mul__(self, other):
return self._generic_binary_op(other, self._get_value_type(other).__mul__)
def __div__(self, other):
return self._generic_binary_op(other, self._get_value_type(other).__div__)
__radd__ = __add__
__rmul__ = __mul__
def __rsub__(self, other):
return self._generic_binary_op(other, self._get_value_type(other).__rsub__)
def __rdiv__(self, other):
return self._generic_binary_op(other, self._get_value_type(other).__rdiv__)
def items(self):
return [(ts, self.values[pos]) for pos, ts in enumerate(self.timestamps)]
def iterkeys(self):
for key in self.timestamps:
yield key
def itervalues(self):
for value in self.values:
yield value
def iteritems(self):
for item in self.items():
yield item
def iteritems_silent(self):
for item in self.items():
yield item
yield None
def _generic_binary_op(self, other, op):
"""
Perform the method operation specified in the op parameter on the values
within the instance's time series values and either another time series
or a constant number value.
:param other: Time series of values or a constant number to use in calculations with instance's time series.
:param func op: The method to perform the calculation between the values.
:return: :class:`TimeSeries` object.
"""
output = {}
if isinstance(other, TimeSeries):
for key, value in self.items():
if key in other:
try:
result = op(value, other[key])
if result is NotImplemented:
other_type = type(other[key])
other_op = vars(other_type).get(op.__name__)
if other_op:
output[key] = other_op(other_type(value), other[key])
else:
output[key] = result
except ZeroDivisionError:
continue
else:
for key, value in self.items():
try:
result = op(value, other)
if result is NotImplemented:
other_type = type(other)
other_op = vars(other_type).get(op.__name__)
if other_op:
output[key] = other_op(other_type(value), other)
else:
output[key] = result
except ZeroDivisionError:
continue
if output:
return TimeSeries(output)
else:
raise ValueError('TimeSeries data was empty or invalid.')
def _get_value_type(self, other):
"""
Get the object type of the value within the values portion of the time series.
:return: `type` of object
"""
if self.values:
return type(self.values[0])
elif isinstance(other, TimeSeries) and other.values:
return type(other.values[0])
else:
raise ValueError('Cannot perform arithmetic on empty time series.')
def align(self, other):
"""
Align two time series so that len(self) == len(other) and self.timstamps == other.timestamps.
:return: :tuple:(`TimeSeries` object(the aligned self), `TimeSeries` object(the aligned other))
"""
if isinstance(other, TimeSeries):
aligned, other_aligned = {}, {}
i, other_i = self.iteritems_silent(), other.iteritems_silent()
item, other_item = i.next(), other_i.next()
while item and other_item:
# Unpack timestamps and values.
timestamp, value = item
other_timestamp, other_value = other_item
if timestamp == other_timestamp:
aligned[timestamp] = value
other_aligned[other_timestamp] = other_value
item = i.next()
other_item = other_i.next()
elif timestamp < other_timestamp:
aligned[timestamp] = value
other_aligned[timestamp] = other_value
item = i.next()
else:
aligned[other_timestamp] = value
other_aligned[other_timestamp] = other_value
other_item = other_i.next()
# Align remaining items.
while item:
timestamp, value = item
aligned[timestamp] = value
other_aligned[timestamp] = other.values[-1]
item = i.next()
while other_item:
other_timestamp, other_value = other_item
aligned[other_timestamp] = self.values[-1]
other_aligned[other_timestamp] = other_value
other_item = other_i.next()
return TimeSeries(aligned), TimeSeries(other_aligned)
def smooth(self, smoothing_factor):
"""
return a new time series which is a exponential smoothed version of the original data series.
soomth forward once, backward once, and then take the average.
:param float smoothing_factor: smoothing factor
:return: :class:`TimeSeries` object.
"""
forward_smooth = {}
backward_smooth = {}
output = {}
if self:
pre = self.values[0]
next = self.values[-1]
for key, value in self.items():
forward_smooth[key] = smoothing_factor * pre + (1 - smoothing_factor) * value
pre = forward_smooth[key]
for key, value in reversed(self.items()):
backward_smooth[key] = smoothing_factor * next + (1 - smoothing_factor) * value
next = backward_smooth[key]
for key in forward_smooth.keys():
output[key] = (forward_smooth[key] + backward_smooth[key]) / 2
return TimeSeries(output)
def add_offset(self, offset):
"""
Return a new time series with all timestamps incremented by some offset.
:param int offset: The number of seconds to offset the time series.
:return: `None`
"""
self.timestamps = map(lambda ts: ts + offset, self.timestamps)
def normalize(self):
"""
Return a new time series with all values normalized to 0 to 1.
:return: `None`
"""
maximum = self.max()
if maximum:
self.values = map(lambda value: value / maximum, self.values)
def crop(self, start_timestamp, end_timestamp):
"""
Return a new TimeSeries object contains all the timstamps and values within
the specified range.
:param int start_timestamp: the start timestamp value
:param int end_timestamp: the end timestamp value
:return: :class:`TimeSeries` object.
"""
output = {}
for key, value in self.items():
if key >= start_timestamp and key <= end_timestamp:
output[key] = value
if output:
return TimeSeries(output)
else:
raise ValueError('TimeSeries data was empty or invalid.')
def average(self, default=None):
"""
Calculate the average value over the time series.
:param default: Value to return as a default should the calculation not be possible.
:return: Float representing the average value or `None`.
"""
return numpy.asscalar(numpy.average(self.values)) if self.values else default
def median(self, default=None):
"""
Calculate the median value over the time series.
:param default: Value to return as a default should the calculation not be possible.
:return: Float representing the median value or `None`.
"""
return numpy.asscalar(numpy.median(self.values)) if self.values else default
def max(self, default=None):
"""
Calculate the maximum value over the time series.
:param default: Value to return as a default should the calculation not be possible.
:return: Float representing the maximum value or `None`.
"""
return numpy.asscalar(numpy.max(self.values)) if self.values else default
def min(self, default=None):
"""
Calculate the minimum value over the time series.
:param default: Value to return as a default should the calculation not be possible.
:return: Float representing the maximum value or `None`.
"""
return numpy.asscalar(numpy.min(self.values)) if self.values else default
def percentile(self, n, default=None):
"""
Calculate the Nth Percentile value over the time series.
:param int n: Integer value of the percentile to calculate.
:param default: Value to return as a default should the calculation not be possible.
:return: Float representing the Nth percentile value or `None`.
"""
return numpy.asscalar(numpy.percentile(self.values, n)) if self.values else default
def stdev(self, default=None):
"""
Calculate the standard deviation of the time series.
:param default: Value to return as a default should the calculation not be possible.
:return: Float representing the standard deviation value or `None`.
"""
return numpy.asscalar(numpy.std(self.values)) if self.values else default
def sum(self, default=None):
"""
Calculate the sum of all the values in the times series.
:param default: Value to return as a default should the calculation not be possible.
:return: Float representing the sum or `None`.
"""
return numpy.asscalar(numpy.sum(self.values)) if self.values else default