Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
added get_error implementation - fixes #163
Browse files Browse the repository at this point in the history
  • Loading branch information
tarekziade committed Sep 18, 2013
1 parent 3f654a2 commit 6081951
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 49 deletions.
104 changes: 67 additions & 37 deletions loads/db/_python.py
Expand Up @@ -25,6 +25,7 @@ def _initialize(self):
os.makedirs(self.directory)

self._buffer = defaultdict(Queue)
self._errors = defaultdict(Queue)
self._callback = ioloop.PeriodicCallback(self.flush, self.sync_delay,
self.loop)
self._callback.start()
Expand Down Expand Up @@ -60,13 +61,35 @@ def add(self, data):
data_type = data.get('data_type', 'unknown')
size = data.get('size', 1)
self._counts[run_id][data_type] += size
self._buffer[run_id].put(data)
self._buffer[run_id].put(dict(data))

if 'url' in data:
self._urls[run_id][data['url']] += 1

if data_type == 'addError':
self._errors[run_id].put(dict(data))

self._dirty = True

def _dump_queue(self, run_id, queue, filename):
# lines
qsize = queue.qsize()
if qsize == 0:
return

if run_id is None:
run_id = 'unknown'

with open(filename, 'a+') as f:
for i in range(qsize):
line = queue.get()
if 'run_id' not in line:
line['run_id'] = run_id

f.write(json.dumps(line, sort_keys=True) + '\n')

def flush(self):

if not self._dirty:
return

Expand All @@ -80,21 +103,15 @@ def flush(self):
if len(self._buffer) == 0:
return

for run_id, queue in self._buffer.items():
# lines
qsize = queue.qsize()
if qsize == 0:
continue

if run_id is None:
run_id = 'unknown'
for run_id, queue in self._errors.items():
# error lines
filename = os.path.join(self.directory, run_id + '-errors.json')
self._dump_queue(run_id, queue, filename)

for run_id, queue in self._buffer.items():
# all lines
filename = os.path.join(self.directory, run_id + '-db.json')

with open(filename, 'a+') as f:
for i in range(qsize):
line = queue.get()
f.write(json.dumps(line, sort_keys=True) + '\n')
self._dump_queue(run_id, queue, filename)

# counts
filename = os.path.join(self.directory, run_id + '-counts.json')
Expand Down Expand Up @@ -137,6 +154,40 @@ def get_runs(self):
for path in os.listdir(self.directory)
if path.endswith('-db.json')])

def _batch(self, filename, start=None, size=None, filter=None):
if start is not None and size is not None:
end = start + size
else:
end = None

# XXX suboptimal iterates until start is reached.
sent = 0

with open(filename) as f:
for current, line in enumerate(iter(f.readline, '')):
data = json.loads(line)
if filter is not None and filter(data):
continue
if start is not None and current < start:
continue
elif end is not None and current > end or sent == size:
raise StopIteration()
yield data, line
sent += 1

def get_errors(self, run_id, start=None, size=None):
if size is not None and start is None:
start = 0

self.flush()
filename = os.path.join(self.directory, run_id + '-errors.json')

if not os.path.exists(filename):
raise StopIteration()

for data, line in self._batch(filename, start, size):
yield data

def get_data(self, run_id, data_type=None, groupby=False, start=None,
size=None):
if size is not None and start is None:
Expand All @@ -152,34 +203,13 @@ def _filtered(data):
return (data_type is not None and
data_type != data.get('data_type'))

def _batch():
if start is not None and size is not None:
end = start + size
else:
end = None

# XXX suboptimal iterates until start is reached.
sent = 0

with open(filename) as f:
for current, line in enumerate(iter(f.readline, '')):
data = json.loads(line)
if _filtered(data):
continue
if start is not None and current < start:
continue
elif end is not None and current > end or sent == size:
raise StopIteration()
yield data, line
sent += 1

if not groupby:
for data, line in _batch():
for data, line in self._batch(filename, start, size, _filtered):
yield data
else:
grouped = dict()

for data, line in _batch():
for data, line in self._batch(filename, start, size, _filtered):
if line in grouped:
grouped[line] = grouped[line][0] + 1, data
else:
Expand Down
23 changes: 23 additions & 0 deletions loads/db/_redis.py
Expand Up @@ -82,6 +82,10 @@ def add(self, data):
dumped = dumps(data, sort_keys=True)
pipeline.lpush('data:%s' % run_id, dumped)

# adding errors
if data_type == 'addError':
pipeline.lpush('errors:%s' % run_id, dumped)

# adding group by
md5 = hashlib.md5(dumped).hexdigest()
pipeline.incrby('bcount:%s:%s' % (run_id, md5), size)
Expand Down Expand Up @@ -116,6 +120,25 @@ def get_counts(self, run_id):
def get_runs(self):
return self._redis.smembers('runs')

def get_errors(self, run_id, start=None, size=None):
key = 'errors:%s' % run_id
len = self._redis.llen(key)
if len == 0:
raise StopIteration()

if start is None:
start = 0

if size is None:
end = len
else:
end = start + size
if end > len:
end = len

for index in range(start, end):
yield loads(self._redis.lindex(key, index))

def get_data(self, run_id, data_type=None, groupby=False, start=None,
size=None):
key = 'data:%s' % run_id
Expand Down
36 changes: 31 additions & 5 deletions loads/tests/test_python_db.py
Expand Up @@ -28,6 +28,10 @@
'agent_id': _AGENT_ID, 'loads_status': [1, 1, 1, 0],
'data_type': 'addSuccess', 'run_id': _RUN_ID},

{'test': 'test_es (loads.examples.test_blog.TestWebSite)',
'agent_id': _AGENT_ID, 'loads_status': [1, 1, 1, 0],
'data_type': 'addError', 'run_id': _RUN_ID},

{'test': 'test_es (loads.examples.test_blog.TestWebSite)',
'agent_id': _AGENT_ID, 'loads_status': [1, 1, 1, 0],
'data_type': 'stopTest', 'run_id': _RUN_ID},
Expand Down Expand Up @@ -74,8 +78,8 @@ def add_data():
with open(os.path.join(self.db.directory, '2-db.json')) as f:
data2 = [json.loads(line) for line in f]

self.assertEqual(len(data), 12)
self.assertEqual(len(data2), 12)
self.assertEqual(len(data), 14)
self.assertEqual(len(data2), 14)
counts = self.db.get_counts('1')

for type_ in ('addSuccess', 'stopTestRun', 'stopTest',
Expand All @@ -87,7 +91,7 @@ def add_data():
self.assertEqual(len(batch), 2)

batch = list(self.db.get_data('1', start=2))
self.assertEqual(len(batch), 10)
self.assertEqual(len(batch), 12)

batch = list(self.db.get_data('1', start=2, size=5))
self.assertEqual(len(batch), 5)
Expand All @@ -102,7 +106,7 @@ def add_data():

# group by
res = list(self.db.get_data('1', groupby=True))
self.assertEqual(len(res), 6)
self.assertEqual(len(res), 7)
self.assertEqual(res[0]['count'], 2)

res = list(self.db.get_data('1', data_type='add_hit', groupby=True))
Expand All @@ -113,7 +117,7 @@ def add_data():

# len(data) < asked ize
batch = list(self.db.get_data('1', start=2, size=5000))
self.assertEqual(len(batch), 10)
self.assertEqual(len(batch), 12)

def test_metadata(self):
self.assertEqual(self.db.get_metadata('1'), {})
Expand Down Expand Up @@ -142,3 +146,25 @@ def add_data():
self.assertTrue(self.db.ping())
urls = self.db.get_urls('1')
self.assertEqual(urls, {'http://127.0.0.1:9200/': 2})

def test_get_errors(self):
def add_data():
for line in ONE_RUN:
data = dict(line)
data['run_id'] = '1'
self.db.add(data)
data['run_id'] = '2'
self.db.add(data)

self.loop.add_callback(add_data)
self.loop.add_callback(add_data)
self.loop.add_timeout(time.time() + .5, self.loop.stop)
self.loop.start()

self.assertTrue(self.db.ping())

errors = list(self.db.get_errors('2'))
self.assertEqual(len(errors), 2, errors)

errors = list(self.db.get_errors('1'))
self.assertEqual(len(errors), 2, errors)
35 changes: 29 additions & 6 deletions loads/tests/test_redis_db.py
Expand Up @@ -14,7 +14,8 @@
from loads.tests.test_python_db import ONE_RUN


_KEYS = ['data:1', 'data:2', 'counters:1', 'counters:2', 'bcounters:1',
_KEYS = ['errors:1', 'errors:2', 'data:1', 'data:2', 'counters:1',
'counters:2', 'bcounters:1',
'bcounters:2', 'metadata:1', 'metadata:2',
'urls:1', 'urls:2']

Expand Down Expand Up @@ -77,8 +78,8 @@ def add_data():
for i in range(self._redis.llen('data:2'))]
data2.sort()

self.assertEqual(len(data), 12)
self.assertEqual(len(data2), 12)
self.assertEqual(len(data), 14)
self.assertEqual(len(data2), 14)
counts = self.db.get_counts('1')

for type_ in ('addSuccess', 'stopTestRun', 'stopTest',
Expand All @@ -90,7 +91,7 @@ def add_data():
self.assertEqual(len(batch), 2)

batch = list(self.db.get_data('1', start=2))
self.assertEqual(len(batch), 10)
self.assertEqual(len(batch), 12)

batch = list(self.db.get_data('1', start=2, size=5))
self.assertEqual(len(batch), 5)
Expand All @@ -105,7 +106,7 @@ def add_data():

# group by
res = list(self.db.get_data('1', groupby=True))
self.assertEqual(len(res), 6)
self.assertEqual(len(res), 7)
self.assertEqual(res[0]['count'], 2)

res = list(self.db.get_data('1', data_type='add_hit', groupby=True))
Expand All @@ -115,7 +116,7 @@ def add_data():

# len(data) < asked ize
batch = list(self.db.get_data('1', start=2, size=5000))
self.assertEqual(len(batch), 10)
self.assertEqual(len(batch), 12)

def test_metadata(self):
self.assertEqual(self.db.get_metadata('1'), {})
Expand Down Expand Up @@ -143,3 +144,25 @@ def add_data():

urls = self.db.get_urls('1')
self.assertEqual(urls, {'http://127.0.0.1:9200/': 2})

def test_get_errors(self):
def add_data():
for line in ONE_RUN:
data = dict(line)
data['run_id'] = '1'
self.db.add(data)
data['run_id'] = '2'
self.db.add(data)

self.loop.add_callback(add_data)
self.loop.add_callback(add_data)
self.loop.add_timeout(time.time() + .5, self.loop.stop)
self.loop.start()

self.assertTrue(self.db.ping())

errors = list(self.db.get_errors('2'))
self.assertEqual(len(errors), 2, errors)

errors = list(self.db.get_errors('1'))
self.assertEqual(len(errors), 2, errors)
3 changes: 2 additions & 1 deletion loads/transport/agent.py
Expand Up @@ -182,7 +182,8 @@ def _handle_commands(self, message):
elif command in ('STATUS', '_STATUS'):
status = {}
run_id = data.get('run_id')
status['run_id'] = run_id
if run_id is not None:
status['run_id'] = run_id

for pid, (proc, _run_id) in self._workers.items():
if run_id is not None and run_id != _run_id:
Expand Down

0 comments on commit 6081951

Please sign in to comment.