Skip to content

Commit

Permalink
Merge branch 'master' into tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
willgraf committed Jul 31, 2019
2 parents ceecf01 + 774d8e3 commit aa34f87
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 139 deletions.
227 changes: 127 additions & 100 deletions redis_consumer/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,6 @@ def _put_back_hash(self, redis_hash):
else:
pass # success

# queue_size = self.redis.llen(self.processing_queue)
# if queue_size == 1:
# key = self.redis.rpoplpush(self.processing_queue, self.queue)
# if key != redis_hash:
# self.logger.warning('`RPOPLPUSH %s %s` popped key %s but'
# 'expected key to be %s',
# self.processing_queue, self.queue,
# key, redis_hash)
#
# else:
# self.logger.warning('Expected `%s` would have 1 item, but has %s. '
# 'restarting key `%s` the old way',
# self.processing_queue, queue_size, redis_hash)
# res = self.redis.lrem(self.processing_queue, 1, redis_hash)
# self.logger.debug('LREM %s got response %s', redis_hash, res)
# if res:
# self.redis.lpush(self.queue, redis_hash)
# else:
# self.logger.debug('Trying to put back key %s but it is not in '
# 'queue %s', redis_hash, self.processing_queue)

def get_redis_hash(self):
while True:

Expand Down Expand Up @@ -158,6 +137,16 @@ def get_current_timestamp(self):
"""Helper function, returns ISO formatted UTC timestamp"""
return datetime.datetime.now(pytz.UTC).isoformat()

def purge_processing_queue(self):
"""Move all items from the processing queue to the work queue"""
while True:
key = self.redis.rpoplpush(self.processing_queue, self.queue)
if key is None:
break
self.logger.debug('Found stranded key `%s` in queue `%s`. '
'Moving it back to `%s`.',
key, self.processing_queue, self.queue)

def update_key(self, redis_hash, data=None):
"""Update the hash with `data` and updated_by & updated_at stamps.
Expand All @@ -183,6 +172,10 @@ def _consume(self, redis_hash):
def consume(self):
"""Find a redis key and process it"""
start = timeit.default_timer()

# Purge the processing queue in case of stranded keys
self.purge_processing_queue()

redis_hash = self.get_redis_hash()

if redis_hash is not None: # popped something off the queue
Expand Down Expand Up @@ -230,6 +223,18 @@ def consume(self):
class ImageFileConsumer(Consumer):
"""Consumes image files and uploads the results"""

def __init__(self,
redis_client,
storage_client,
queue,
final_status='done'):
# Create some attributes only used during consume()
self._redis_hash = None
self._redis_values = dict()
super(ImageFileConsumer, self).__init__(
redis_client, storage_client,
queue, final_status)

def is_valid_hash(self, redis_hash):
if redis_hash is None:
return False
Expand All @@ -248,9 +253,8 @@ def _get_processing_function(self, process_type, function_name):
'processing. Got %s.' % cat)

if name not in settings.PROCESSING_FUNCTIONS[cat]:
if cat not in settings.PROCESSING_FUNCTIONS:
raise ValueError('"%s" is not a valid %s-processing function'
% (name, cat))
raise ValueError('"%s" is not a valid %s-processing function'
% (name, cat))
return settings.PROCESSING_FUNCTIONS[cat][name]

# def _process(self, image, key, process_type, streaming=False):
Expand Down Expand Up @@ -499,9 +503,12 @@ def grpc_image(self, img, model_name, model_version):
results = results[0]

retrying = False
results = prediction['prediction']

finished = timeit.default_timer() - start
self.update_key(self._redis_hash, {
'prediction_time': finished,
'predict_retries': count,
})
self.logger.debug('Segmented key %s (model %s:%s, '
'preprocessing: %s, postprocessing: %s)'
Expand All @@ -524,12 +531,10 @@ def grpc_image(self, img, model_name, model_version):
})
self.logger.warning('%sException `%s: %s` during '
'PredictClient request to model %s:%s.'
'Waiting %s seconds before retrying.',
' Waiting %s seconds before retrying.',
type(err).__name__, err.code().name,
err.details(), model_name,
model_version, settings.GRPC_BACKOFF)
self.logger.debug('Waiting for %s seconds before retrying',
settings.GRPC_BACKOFF)
time.sleep(settings.GRPC_BACKOFF) # sleep before retry
retrying = True # Unneccessary but explicit
else:
Expand Down Expand Up @@ -700,22 +705,45 @@ def _upload_archived_images(self, hvalues, redis_hash):
all_hashes.add(new_hash)
return all_hashes

def _upload_finished_children(self, finished_children, redis_hash, expire_time=3600):
def _get_output_file_name(self, key):
fname = None
retries = 3
for _ in range(retries):
# sometimes this field is missing, gotta get the truth!
fname = self.redis._redis_master.hget(key, 'output_file_name')
if fname is None:
ttl = self.redis.ttl(key)

if ttl == -2:
raise ValueError('Key `%s` does not exist' % key)

if ttl != -1:
self.logger.warning('Key `%s` has a TTL of %s.'
'Why has it been expired already?',
key, ttl)
else:
self.logger.warning('Key `%s` exists with TTL %s but has'
' no output_file_name', key, ttl)

self.redis._update_masters_and_slaves()
time.sleep(3)
else:
break
else:
raise ValueError('Key %s had no value for output_file_name'
' %s times in a row.' % (key, retries))
return fname

def _upload_finished_children(self, finished_children, redis_hash):
saved_files = set()
with utils.get_tempdir() as tempdir:
# process each successfully completed key
for key in finished_children:
if not key:
continue

fname = self.redis.hget(key, 'output_file_name')
if fname is None:
if self.redis.exists(key):
ttl = self.redis.ttl(key)
raise ValueError('Key `%s` exists with TTL %s but has '
'no output_file_name' % (key, ttl))
else:
raise ValueError('Key `%s` does not exist' % key)
fname = self._get_output_file_name(key)

local_fname = self.storage.download(fname, tempdir)

self.logger.info('Saved file: %s', local_fname)
Expand All @@ -729,7 +757,6 @@ def _upload_finished_children(self, finished_children, redis_hash, expire_time=3
for imfile in image_files:
saved_files.add(imfile)

self.redis.expire(key, expire_time)
self.update_key(redis_hash)

# zip up all saved results
Expand All @@ -740,20 +767,19 @@ def _upload_finished_children(self, finished_children, redis_hash, expire_time=3
self.logger.debug('Uploaded output to: `%s`', url)
return path, url

def _parse_failures(self, failed_children, expire_time=3600):
def _parse_failures(self, failed_children):
failed_hashes = {}
for key in failed_children:
if not key:
continue
reason = self.redis.hget(key, 'reason')
# one of the hashes failed to process
self.logger.error('Failed to process hash `%s`: %s',
key, reason)
self.logger.error('Child key `%s` failed: %s', key, reason)
failed_hashes[key] = reason
self.redis.expire(key, expire_time)

if failed_hashes:
self.logger.warning('Failed to process hashes: %s',
self.logger.warning('%s child keys failed to process: %s',
len(failed_hashes),
json.dumps(failed_hashes, indent=4))

# check python2 vs python3
Expand All @@ -764,22 +790,67 @@ def _parse_failures(self, failed_children, expire_time=3600):

return url_encode(failed_hashes)

def _cleanup(self, redis_hash, children, done, failed):
# get summary data for all finished children
summary_fields = [
# 'created_at',
# 'finished_at',
'prediction_time',
'postprocess_time',
'upload_time',
'download_time',
'total_time',
]

summaries = dict()
for d in done:
results = self.redis.hmget(d, *summary_fields)
for field, result in zip(summary_fields, results):
try:
if field not in summaries:
summaries[field] = [float(result)]
else:
summaries[field].append(float(result))
except:
self.logger.warning('Summary field `%s` is not a '
'float: %s', field, result)

for k in summaries:
summaries[k] = sum(summaries[k]) / len(summaries[k])

output_file_name, output_url = self._upload_finished_children(
done, redis_hash)

failures = self._parse_failures(failed)

summaries.update({
'status': self.final_status,
'finished_at': self.get_current_timestamp(),
'output_url': output_url,
'failures': failures,
'total_jobs': len(children),
'output_file_name': output_file_name
})

# Update redis with the results
self.update_key(redis_hash, summaries)

expire_time = settings.EXPIRE_TIME
for key in children:
self.redis.expire(key, expire_time)

self.logger.debug('All %s child keys will be expiring in %s '
'seconds.', len(children), expire_time)

def _consume(self, redis_hash):
start = timeit.default_timer()
hvals = self.redis.hgetall(redis_hash)
self.logger.debug('Found hash to process `%s` with status `%s`.',
redis_hash, hvals.get('status'))

key_separator = ',' # char to separate child keys in Redis
expire_time = 60 * 10 # expire finished child keys in ten minutes

self.update_key(redis_hash) # refresh timestamp

# check to see which child keys have been processed
children = set(hvals.get('children', '').split(key_separator))
done = set(hvals.get('children:done', '').split(key_separator))
failed = set(hvals.get('children:failed', '').split(key_separator))

if hvals.get('status') == 'new':
# download the zip file, upload the contents, and enter into Redis
all_hashes = self._upload_archived_images(hvals, redis_hash)
Expand All @@ -795,6 +866,11 @@ def _consume(self, redis_hash):

elif hvals.get('status') == 'waiting':
# this key was previously processed by a ZipConsumer
# check to see which child keys have already been processed
children = set(hvals.get('children', '').split(key_separator))
done = set(hvals.get('children:done', '').split(key_separator))
failed = set(hvals.get('children:failed', '').split(key_separator))

# get keys that have not yet reached a completed status
remaining_children = children - done - failed
for child in remaining_children:
Expand All @@ -811,61 +887,12 @@ def _consume(self, redis_hash):

# if there are no remaining children, update status to cleanup
self.update_key(redis_hash, {
'status': 'cleanup' if not remaining_children else 'waiting',
'children:done': key_separator.join(d for d in done if d),
'children:failed': key_separator.join(f for f in failed if f),
})

elif hvals.get('status') == 'cleanup':
# clean up children with status `done` and `failed`

# get summary data for all finished children
summary_fields = [
# 'created_at',
# 'finished_at',
'prediction_time',
'postprocess_time',
'upload_time',
'download_time',
'total_time',
]

summaries = dict()
for d in done:
results = self.redis.hmget(d, *summary_fields)
for field, result in zip(summary_fields, results):
try:
if field not in summaries:
summaries[field] = [float(result)]
else:
summaries[field].append(float(result))
except:
self.logger.warning('Summary field `%s` is not a '
'float: %s', field, result)

for k in summaries:
summaries[k] = sum(summaries[k]) / len(summaries[k])

output_file_name, output_url = self._upload_finished_children(
done, redis_hash, expire_time)

failures = self._parse_failures(failed, expire_time)

summaries.update({
'status': self.final_status,
'finished_at': self.get_current_timestamp(),
'output_url': output_url,
'failures': failures,
'total_jobs': len(children),
'output_file_name': output_file_name
})

# Update redis with the results
self.update_key(redis_hash, summaries)

self.logger.info('Processed all %s images of zipfile `%s` in %s',
len(children), hvals.get('input_file_name'),
timeit.default_timer() - start)
if not remaining_children:
self._cleanup(redis_hash, children, done, failed)


class TrackingConsumer(Consumer):
Expand Down
Loading

0 comments on commit aa34f87

Please sign in to comment.