Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Create cube.time_utils.timeago to support duplicity-like interval string

  • Loading branch information...
commit 931992209354857406a950e31c173eae196ef582 1 parent b8d390d
@tsileo authored
View
5 README.rst
@@ -118,6 +118,11 @@ Metric resolutions shortcut
from cube import ONE_HOUR, FIVE_MINUTE
+Time utils
+----------
+
+
+
Event helper
------------
View
19 cube/__init__.py
@@ -8,6 +8,8 @@
import requests
+from cube.event import Event
+
API_VERSION = '1.0'
# Metric resolutions shortcuts
@@ -19,23 +21,6 @@
ONE_DAY = '864e5'
-class Event(object):
- def __init__(self, cube, event_type):
- self.cube = cube
- self.event_type = event_type
-
- def put(self, event_data={}, **kwargs):
- return self.cube.put(self.event_type, event_data, **kwargs)
-
- def event(self, expression=None, **kwargs):
- if expression is None:
- expression = self.event_type
- return self.cube.event(expression, **kwargs)
-
- def metric(self, expression, **kwargs):
- return self.cube.metric(self, expression, **kwargs)
-
-
class Cube(object):
def __init__(self, hostname="localhost", **kwargs):
self.collector_url = 'http://{0}:{1}/{2}/'.format(hostname,
View
18 cube/event.py
@@ -0,0 +1,18 @@
+# -*- encoding: utf-8 -*-
+
+
+class Event(object):
+ def __init__(self, cube, event_type):
+ self.cube = cube
+ self.event_type = event_type
+
+ def put(self, event_data={}, **kwargs):
+ return self.cube.put(self.event_type, event_data, **kwargs)
+
+ def event(self, expression=None, **kwargs):
+ if expression is None:
+ expression = self.event_type
+ return self.cube.event(expression, **kwargs)
+
+ def metric(self, expression, **kwargs):
+ return self.cube.metric(self, expression, **kwargs)
View
11 cube/tests/test_time_utils.py
@@ -1,6 +1,7 @@
# -*- encoding: utf-8 -*-
"""
+Copyright (c) 2013 Thomas Sileo
Copyright (c) 2012 Steven Buss
Originally from:
https://github.com/sbuss/pypercube/blob/master/tests/test_time_utils.py
@@ -41,3 +42,13 @@ def test_floor(self):
datetime(2012, 7, 6))
self.assertRaisesRegexp(ValueError, "is not a valid resolution",
time_utils.floor, self.now, 12345)
+
+ def test_timeago(self):
+ self.assertEqual(time_utils.timeago('1D', start=self.now),
+ datetime(2012, 7, 5, 20, 33, 16, 573225))
+ self.assertEqual(time_utils.timeago('1s', start=self.now),
+ datetime(2012, 7, 6, 20, 33, 15, 573225))
+ self.assertEqual(time_utils.timeago('1W', start=self.now),
+ datetime(2012, 6, 29, 20, 33, 16, 573225))
+ self.assertEqual(time_utils.timeago('1M', start=self.now),
+ datetime(2012, 6, 6, 20, 33, 16, 573225))
View
71 cube/time_utils.py
@@ -1,13 +1,14 @@
# -*- encoding: utf-8 -*-
"""
+Copyright (c) 2013 Thomas Sileo
Copyright (c) 2012 Steven Buss
Originally from:
https://github.com/sbuss/pypercube/blob/master/pypercube/time_utils.py
"""
-from datetime import datetime
-from datetime import timedelta
+import re
+from datetime import datetime, timedelta
STEP_10_SEC = long(1e4)
STEP_1_MIN = long(6e4)
@@ -53,18 +54,74 @@ def floor(start, resolution):
"""
if resolution == STEP_10_SEC:
return datetime(start.year, start.month, start.day, start.hour,
- start.minute, start.second - (start.second % 10))
+ start.minute, start.second - (start.second % 10))
elif resolution == STEP_1_MIN:
return datetime(start.year, start.month, start.day, start.hour,
- start.minute)
+ start.minute)
elif resolution == STEP_5_MIN:
return datetime(start.year, start.month, start.day, start.hour,
- start.minute - (start.minute % 5))
+ start.minute - (start.minute % 5))
elif resolution == STEP_1_HOUR:
return datetime(start.year, start.month, start.day, start.hour)
elif resolution == STEP_1_DAY:
return datetime(start.year, start.month, start.day)
raise ValueError("{resolution} is not a valid resolution. Valid choices "
- "are {choices}".format(
- resolution=resolution, choices=STEP_CHOICES))
+ "are {choices}".format(resolution=resolution,
+ choices=STEP_CHOICES))
+
+
+def _timedelta_total_seconds(td):
+ """Python 2.6 backward compatibility function for timedelta.total_seconds.
+
+ :type td: timedelta object
+ :param td: timedelta object
+
+ :rtype: float
+ :return: The total number of seconds for the given timedelta object.
+
+ """
+ if hasattr(timedelta, "total_seconds"):
+ return getattr(td, "total_seconds")()
+
+ # Python 2.6 backward compatibility
+ return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6)
+
+
+def _interval_string_to_seconds(interval_string):
+ """Convert internal string like 1M, 1Y3M, 3W to seconds.
+
+ :type interval_string: str
+ :param interval_string: Interval string like 1M, 1W, 1M3W4h2s...
+ (s => seconds, m => minutes, h => hours, D => days, W => weeks, M => months, Y => Years).
+
+ :rtype: int
+ :return: The conversion in seconds of interval_string.
+
+ """
+ interval_exc = "Bad interval format for {0}".format(interval_string)
+ interval_dict = {"s": 1, "m": 60, "h": 3600, "D": 86400,
+ "W": 7*86400, "M": 30*86400, "Y": 365*86400}
+
+ interval_regex = re.compile("^(?P<num>[0-9]+)(?P<ext>[smhDWMY])")
+ seconds = 0
+
+ while interval_string:
+ match = interval_regex.match(interval_string)
+ if match:
+ num, ext = int(match.group("num")), match.group("ext")
+ if num > 0 and ext in interval_dict:
+ seconds += num * interval_dict[ext]
+ interval_string = interval_string[match.end():]
+ else:
+ raise Exception(interval_exc)
+ else:
+ raise Exception(interval_exc)
+ return seconds
+
+
+def timeago(interval_string, start=None):
+ if start is None:
+ start = now()
+ td = timedelta(seconds=_interval_string_to_seconds(interval_string))
+ return start - td
Please sign in to comment.
Something went wrong with that request. Please try again.