Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: reduce memory footprint by over 60% #1251

Merged
merged 3 commits into from
Nov 9, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 71 additions & 50 deletions metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,64 +19,69 @@
from osclib.stagingapi import StagingAPI

SOURCE_DIR = os.path.dirname(os.path.realpath(__file__))
Point = namedtuple('Point', ['measurement', 'tags', 'fields', 'time', 'delta'])

# Duplicate Leap config to handle 13.2 without issue.
osclib.conf.DEFAULT[
r'openSUSE:(?P<project>[\d.]+)'] = osclib.conf.DEFAULT[
r'openSUSE:(?P<project>Leap:[\d.]+)']

# Provide osc.core.get_request_list() that swaps out search() implementation and
# uses lxml ET to avoid having to reparse to peform complex xpaths.
# Provide osc.core.get_request_list() that swaps out search() implementation to
# capture the generated query, paginate over and yield each request to avoid
# loading all requests at the same time. Additionally, use lxml ET to avoid
# having to re-parse to perform complex xpaths.
def get_request_list(*args, **kwargs):
global _requests

osc.core._search = osc.core.search
osc.core.search = search
osc.core.search = search_capture
osc.core._ET = osc.core.ET
osc.core.ET = ET

osc.core.get_request_list(*args, **kwargs)

osc.core.search = osc.core._search

query = search_capture.query
for request in search_paginated_generator(query[0], query[1], **query[2]):
# Python 3 yield from.
yield request

osc.core.ET = osc.core._ET

return _requests
def search_capture(apiurl, queries=None, **kwargs):
search_capture.query = (apiurl, queries, kwargs)
return {'request': ET.fromstring('<collection matches="0"></collection>')}

# Provides a osc.core.search() implementation for use with get_request_list()
# that paginates in sets of 1000.
def search(apiurl, queries=None, **kwargs):
global _requests

# that paginates in sets of 1000 and yields each request.
def search_paginated_generator(apiurl, queries=None, **kwargs):
if "submit/target/@project='openSUSE:Factory'" in kwargs['request']:
kwargs['request'] = osc.core.xpath_join(kwargs['request'], '@id>250000', op='and')

requests = []
request_count = 0
queries['request']['limit'] = 1000
queries['request']['offset'] = 0
while True:
collection = osc.core._search(apiurl, queries, **kwargs)['request']
requests.extend(collection.findall('request'))
collection = osc.core.search(apiurl, queries, **kwargs)['request']
if not request_count:
print('processing {:,} requests'.format(int(collection.get('matches'))))

for request in collection.findall('request'):
yield request
request_count += 1

if len(requests) == int(collection.get('matches')):
if request_count == int(collection.get('matches')):
# Stop paging once the expected number of items has been returned.
break

# Release memory as otherwise ET seems to hold onto it.
collection.clear()
queries['request']['offset'] += queries['request']['limit']

_requests = requests
return {'request': ET.fromstring('<collection matches="0"></collection>')}

points = []

def point(measurement, fields, datetime, tags={}, delta=False):
global points
points.append({
'measurement': measurement,
'tags': tags,
'fields': fields,
'time': timestamp(datetime),
'delta': delta,
})
points.append(Point(measurement, tags, fields, timestamp(datetime), delta))

def timestamp(datetime):
return int(datetime.strftime('%s'))
Expand All @@ -86,7 +91,6 @@ def ingest_requests(api, project):
req_state=('accepted', 'revoked', 'superseded'),
exclude_target_projects=[project],
withfullhistory=True)
print('processing {:,} requests'.format(len(requests)))
for request in requests:
if request.find('action').get('type') not in ('submit', 'delete'):
# TODO Handle non-stageable requests via different flow.
Expand Down Expand Up @@ -228,9 +232,8 @@ def ingest_requests(api, project):
else:
print('unable to find priority history entry for {} to {}'.format(request.get('id'), priority.text))

walk_points(points, project)

return points
print('finalizing {:,} points'.format(len(points)))
return walk_points(points, project)

def who_workaround(request, review, relax=False):
# Super ugly workaround for incorrect and missing data:
Expand All @@ -257,37 +260,59 @@ def who_workaround(request, review, relax=False):

return who

# Walk data points in order by time, adding up deltas and merging points at
# the same time. Data is converted to dict() and written to influx batches to
# avoid extra memory usage required for all data in dict() and avoid influxdb
# allocating memory for entire incoming data set at once.
def walk_points(points, target):
global client
# Wait until just before writing to drop database.
client.drop_database(client._database)
client.create_database(client._database)

counters = {}
final = []
for point in sorted(points, key=lambda l: l['time']):
if not point['delta']:
final.append(point)
time_last = None
wrote = 0
for point in sorted(points, key=lambda l: l.time):
if point.time != time_last and len(final) >= 1000:
# Write final point in batches of ~1000, but guard against writing
# when in the middle of points at the same time as they may end up
# being merged. As such the previous time should not match current.
client.write_points(final, 's')
wrote += len(final)
final = []
time_last = point.time

if not point.delta:
final.append(dict(point._asdict()))
continue

# A more generic method like 'key' which ended up being needed is likely better.
measurement = counters_tag_key = point['measurement']
measurement = counters_tag_key = point.measurement
if measurement == 'staging':
counters_tag_key += point['tags']['id']
counters_tag_key += point.tags['id']
elif measurement == 'review_count':
counters_tag_key += '_'.join(point['tags']['key'])
counters_tag_key += '_'.join(point.tags['key'])
elif measurement == 'priority':
counters_tag_key += point['tags']['level']
counters_tag_key += point.tags['level']
counters_tag = counters.setdefault(counters_tag_key, {'last': None, 'values': {}})

values = counters_tag['values']
for key, value in point['fields'].items():
for key, value in point.fields.items():
values[key] = values.setdefault(key, 0) + value

if counters_tag['last'] and point['time'] == counters_tag['last']['time']:
if counters_tag['last'] and point.time == counters_tag['last']['time']:
point = counters_tag['last']
else:
point = dict(point._asdict())
counters_tag['last'] = point
final.append(point)
point['fields'].update(counters_tag['values'])

# In order to allow for merging delta entries for the same time.
points = final
# Write any remaining final points.
client.write_points(final, 's')
return wrote + len(final)

def ingest_release_schedule(project):
points = []
Expand All @@ -312,9 +337,13 @@ def ingest_release_schedule(project):
'time': timestamp(date),
})

return points
client.write_points(points, 's')
return len(points)

def main(args):
global client
client = InfluxDBClient(args.host, args.port, args.user, args.password, args.project)

osc.conf.get_config(override_apiurl=args.apiurl)
osc.conf.config['debug'] = args.debug

Expand All @@ -335,16 +364,8 @@ def main(args):
print('who_workaround_swap', who_workaround_swap)
print('who_workaround_miss', who_workaround_miss)

print('writing {:,} points and {:,} annotation points to db'.format(
len(points_requests), len(points_schedule)))

db = args.project
client = InfluxDBClient(args.host, args.port, args.user, args.password, db)
client.drop_database(db)
client.create_database(db)

client.write_points(points_requests, 's')
client.write_points(points_schedule, 's')
print('wrote {:,} points and {:,} annotation points to db'.format(
points_requests, points_schedule))


if __name__ == '__main__':
Expand Down