Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: 1c1863e464
Fetching contributors…

Cannot retrieve contributors at this time

111 lines (83 sloc) 3.234 kb
#! /usr/bin/python
from r2.lib.utils import trunc_time
from r2.lib import amqp
from r2.lib.log import log_text
from pylons import g
from datetime import datetime
from time import sleep
import random as rand
import pickle
q = 'usage_q'
tz = g.display_tz
def check_dict(body):
d = pickle.loads(body)
for k in ("start_time", "end_time", "action"):
if k not in d:
raise TypeError
return d
def hund_from_start_and_end(start_time, end_time):
elapsed = end_time - start_time
hund_sec = int(elapsed.seconds * 100 +
elapsed.microseconds / 10000)
if hund_sec == 0:
fraction = elapsed.microseconds / 10000.0
if rand.random() < fraction:
return 1
else:
return 0
return hund_sec
def buckets(time):
time = time.astimezone(tz)
# Keep:
# Daily buckets for eight days
# 1-hour buckets for 23 hours
# 5-min buckets for two hours
#
# (If the 1-hour bucket lasts more than a day, things can get confusing;
# at 12:30, the 12:xx column will have things from today at 12:20 and
# from yesterday at 12:40. This could be worked around, but the code
# over in pages.py is convoluted enough, so I'd rather not.)
return [
(86400 * 8, time.strftime("%Y/%m/%d_xx:xx")),
( 3600 * 23, time.strftime("%Y/%m/%d_%H:xx")),
( 3600 * 2, trunc_time(time, 5).strftime("%Y/%m/%d_%H:%M")),
]
def run(limit=1000, verbose=False):
def myfunc(msgs, chan):
if verbose:
print "processing a batch"
incrs = {}
for msg in msgs:
try:
d = check_dict(msg.body)
except TypeError:
log_text("usage_q error", "wtf is %r" % msg.body, "error")
continue
hund_sec = hund_from_start_and_end(d["start_time"], d["end_time"])
action = d["action"].replace("-", "_")
fudged_count = int( 1 / d["sampling_rate"])
fudged_elapsed = int(hund_sec / d["sampling_rate"])
for exp_time, bucket in buckets(d["end_time"]):
k = "%s-%s" % (bucket, action)
incrs.setdefault(k, [0, 0, exp_time])
incrs[k][0] += fudged_count
incrs[k][1] += fudged_elapsed
for k, (count, elapsed, exp_time) in incrs.iteritems():
c_key = "profile_count-" + k
e_key = "profile_elapsed-" + k
if verbose:
c_old = g.hardcache.get(c_key)
e_old = g.hardcache.get(e_key)
g.hardcache.accrue(c_key, delta=count, time=exp_time)
g.hardcache.accrue(e_key, delta=elapsed, time=exp_time)
if verbose:
c_new = g.hardcache.get(c_key)
e_new = g.hardcache.get(e_key)
print "%s: %s -> %s" % (c_key, c_old, c_new)
print "%s: %s -> %s" % (e_key, e_old, e_new)
if len(msgs) < limit / 2:
if verbose:
print "Sleeping..."
sleep (10)
amqp.handle_items(q, myfunc, limit=limit, drain=False, verbose=verbose,
sleep_time = 30)
Jump to Line
Something went wrong with that request. Please try again.