From d549bf709c88e2207d2e2aa272990ed7ffddf877 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Fri, 10 May 2019 12:14:03 -0700 Subject: [PATCH 01/34] sleep for `EMPTY_QUEUE_TIMEOUT` seconds if queue is empty --- redis_consumer/consumers.py | 5 +++++ redis_consumer/settings.py | 2 ++ 2 files changed, 7 insertions(+) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 6969efcb..3ea0ada1 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -164,6 +164,11 @@ def consume(self): # remove the key from the processing queue self.redis.lrem(self.processing_queue, 1, redis_hash) + else: + self.logger.debug('Queue `%s` is empty. Waiting for %s seconds.', + self.queue, settings.EMPTY_QUEUE_TIMEOUT) + time.sleep(settings.EMPTY_QUEUE_TIMEOUT) + class ImageFileConsumer(Consumer): """Consumes image files and uploads the results""" diff --git a/redis_consumer/settings.py b/redis_consumer/settings.py index 0a1743f7..a41b934c 100644 --- a/redis_consumer/settings.py +++ b/redis_consumer/settings.py @@ -62,7 +62,9 @@ # gRPC API timeout in seconds (scales with `cuts`) GRPC_TIMEOUT = config('GRPC_TIMEOUT', default=30, cast=int) +# timeout/backoff wait time in seconds REDIS_TIMEOUT = config('REDIS_TIMEOUT', default=3, cast=int) +EMPTY_QUEUE_TIMEOUT = config('EMPTY_QUEUE_TIMEOUT', default=5, cast=int) # Status of hashes marked for prediction STATUS = config('STATUS', default='new') From b0da22056c9368f24f84a5dfc8d515406c52005b Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Fri, 10 May 2019 12:23:58 -0700 Subject: [PATCH 02/34] update keys to have ":" separators instead of "_" --- redis_consumer/consumers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 3ea0ada1..77178455 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -533,7 +533,7 @@ def _upload_archived_images(self, hvalues): subdir = os.path.dirname(clean_imfile) dest, _ = self.storage.upload(imfile, subdir=subdir) - new_hash = '{prefix}_{file}_{hash}'.format( + new_hash = '{prefix}:{file}:{hash}'.format( prefix=settings.HASH_PREFIX, file=clean_imfile, hash=uuid.uuid4().hex) From eccaedd9d8581fff103c184b3bdd542115bbc039 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Fri, 10 May 2019 15:09:34 -0700 Subject: [PATCH 03/34] major ZipFileConsumer updates. Instead of waiting for each file in the archive to be processed, update the status and fields of the zip job and move to next item. --- redis_consumer/consumers.py | 173 +++++++++++++++++++++++------------- 1 file changed, 110 insertions(+), 63 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 77178455..9838a99c 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -71,6 +71,11 @@ def __init__(self, self.final_status = final_status self.logger = logging.getLogger(str(self.__class__.__name__)) + def _put_back_hash(self, redis_hash): + """Put the hash back into the work queue""" + self.redis.lrem(self.processing_queue, 1, redis_hash) + self.redis.lpush(self.queue, redis_hash) + def get_redis_hash(self): while True: redis_hash = self.redis.rpoplpush(self.queue, self.processing_queue) @@ -85,8 +90,7 @@ def get_redis_hash(self): # this invalid hash should not be processed by this consumer. # remove it from processing, and push it back to the work queue. - self.redis.lrem(self.processing_queue, 1, redis_hash) - self.redis.lpush(self.queue, redis_hash) + self._put_back_hash(redis_hash) def _handle_error(self, err, redis_hash): """Update redis with failure information, and log errors. @@ -548,90 +552,133 @@ def _upload_archived_images(self, hvalues): new_hvals['created_at'] = current_timestamp new_hvals['updated_at'] = current_timestamp - self.redis.hmset(new_hash, new_hvals) - self.redis.lpush(self.queue, new_hash) self.logger.debug('Added new hash `%s`: %s', new_hash, json.dumps(new_hvals, indent=4)) all_hashes.add(new_hash) return all_hashes + def _upload_finished_children(self, finished_children, expire_time=3600): + saved_files = set() + with utils.get_tempdir() as tempdir: + # process each successfully completed key + for key in finished_children: + fname = self.redis.hget(key, 'output_file_name') + local_fname = self.storage.download(fname, tempdir) + + self.logger.info('Saved file: %s', local_fname) + + if zipfile.is_zipfile(local_fname): + image_files = utils.get_image_files_from_dir( + local_fname, tempdir) + else: + image_files = [local_fname] + + for imfile in image_files: + saved_files.add(imfile) + + self.redis.expire(key, expire_time) + + # zip up all saved results + zip_file = utils.zip_files(saved_files, tempdir) + + # Upload the zip file to cloud storage bucket + path, url = self.storage.upload(zip_file) + self.logger.debug('Uploaded output to: `%s`', url) + return path, url + + def _parse_failures(self, failed_children, expire_time=3600): + failed_hashes = {} + for key in failed_children: + reason = self.redis.hget(key, 'reason') + # one of the hashes failed to process + self.logger.error('Failed to process hash `%s`: %s', + key, reason) + failed_hashes[key] = reason + self.redis.expire(key, expire_time) + + if failed_hashes: + self.logger.warning('Failed to process hashes: %s', + json.dumps(failed_hashes, indent=4)) + + # check python2 vs python3 + if hasattr(urllib, 'parse'): + url_encode = urllib.parse.urlencode # pylint: disable=E1101 + else: + url_encode = urllib.urlencode # pylint: disable=E1101 + + return url_encode(failed_hashes) + def _consume(self, redis_hash): start = timeit.default_timer() hvals = self.redis.hgetall(redis_hash) self.logger.debug('Found hash to process `%s`: %s', redis_hash, json.dumps(hvals, indent=4)) - self.update_status(redis_hash, 'started', { - 'identity_started': self.hostname, - }) + key_separator = ',' # char to separate child keys in Redis + expire_time = 60 * 10 # expire finished child keys in ten minutes - all_hashes = self._upload_archived_images(hvals) - self.logger.info('Uploaded %s hashes. Waiting for ImageConsumers.', - len(all_hashes)) + if hvals['status'] == 'new': + # download the zip file, upload the contents, and enter into Redis + self.update_status(redis_hash, 'started', { + 'identity_started': self.hostname, + }) - # Now all images have been uploaded with new redis hashes - # Wait for these to be processed by an ImageFileConsumer - self.update_status(redis_hash, 'waiting') + all_hashes = self._upload_archived_images(hvals) + self.logger.info('Uploaded %s hashes. Waiting for ImageConsumers.', + len(all_hashes)) - with utils.get_tempdir() as tempdir: - finished_hashes = set() - failed_hashes = dict() - saved_files = set() - - expire_time = 60 * 10 # ten minutes - - # ping redis until all the sets are finished - while all_hashes.symmetric_difference(finished_hashes): - for h in all_hashes: - if h in finished_hashes: - continue - - status = self.redis.hget(h, 'status') - - if status == 'failed': - reason = self.redis.hget(h, 'reason') - # one of the hashes failed to process - self.logger.error('Failed to process hash `%s`: %s', - h, reason) - failed_hashes[h] = reason - finished_hashes.add(h) - self.redis.expire(h, expire_time) - - elif status == self.final_status: - # one of our hashes is done! - fname = self.redis.hget(h, 'output_file_name') - local_fname = self.storage.download(fname, tempdir) - self.logger.info('Saved file: %s', local_fname) - if zipfile.is_zipfile(local_fname): - image_files = utils.get_image_files_from_dir( - local_fname, tempdir) - else: - image_files = [local_fname] - - for imfile in image_files: - saved_files.add(imfile) - finished_hashes.add(h) - self.redis.expire(h, expire_time) - - if failed_hashes: - self.logger.warning('Failed to process hashes: %s', - json.dumps(failed_hashes, indent=4)) + # Now all images have been uploaded with new redis hashes + # Update Redis with child keys and put item back in queue + self.update_status(redis_hash, 'waiting', { + 'children': key_separator.join(all_hashes), + 'children:done': '', # empty for now + 'children:failed': '', # empty for now + }) - zip_file = utils.zip_files(saved_files, tempdir) + # remove it from processing, and push it back to the work queue. + self._put_back_hash(redis_hash) + + elif hvals['status'] == 'waiting': + # this key was previously processed by a ZipConsumer + # check to see which child keys have been processed + children = set(hvals.get('children', '').split(key_separator)) + done = set(hvals.get('children:done', '').split(key_separator)) + failed = set(hvals.get('children:failed', '').split(key_separator)) + + # get keys that have not yet reached a completed status + remaining_children = children - done - failed + for child in remaining_children: + status = self.redis.hget(child, 'status') + if status == 'failed': + failed.add(child) + elif status == self.final_status: + done.add(child) + + # if there are no remaining children, update status to cleanup + status = 'cleanup' if not children - done - failed else 'waiting' + self.update_status(redis_hash, status, { + 'children:done': key_separator.join(done), + 'children:failed': key_separator.join(failed), + }) + # remove it from processing, and push it back to the work queue. + self._put_back_hash(redis_hash) - # Upload the zip file to cloud storage bucket - uploaded_file_path, output_url = self.storage.upload(zip_file) - self.logger.debug('Uploaded output to: `%s`', output_url) + elif hvals['status'] == 'cleanup': + # clean up children with status `done` + done = hvals['children:done'].split(key_separator) + uploaded_file_path, output_url = self._upload_finished_children( + done, expire_time) - # check python2 vs python3 - url = urllib.parse.urlencode if hasattr(urllib, 'parse') else urllib.urlencode + # clean up children with status `failed` + failed = hvals['children:failed'].split(key_separator) + failures = self._parse_failures(failed, expire_time) # Update redis with the results self.update_status(redis_hash, self.final_status, { 'identity_output': self.hostname, 'finished_at': self.get_current_timestamp(), 'output_url': output_url, - 'failures': url(failed_hashes), + 'failures': failures, 'output_file_name': uploaded_file_path }) self.logger.info('Processed all %s images of zipfile `%s` in %s', From dd98d71a1ad0f74b81f824526b4f7cb4efcc2f1b Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Fri, 10 May 2019 17:00:22 -0700 Subject: [PATCH 04/34] remove bad child keys and only log `Consumed` when status == final_status --- redis_consumer/consumers.py | 50 ++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 9838a99c..aec9abb0 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -52,9 +52,10 @@ class Consumer(object): """Base class for all redis event consumer classes. Args: - redis_client: Client class to communicate with redis - storage_client: Client to communicate with cloud storage buckets. - final_status: Update the status of redis event with this value. + redis_client: obj, Client class to communicate with redis + storage_client: obj, Client to communicate with cloud storage buckets. + queue: str, Name of queue to pop off work items. + final_status: str, Update the status of redis event with this value. """ def __init__(self, @@ -137,22 +138,19 @@ def _consume(self, redis_hash): raise NotImplementedError def consume(self): - """Consume all redis events every `interval` seconds. - - Args: - status: string, only consume hashes where `status` == status. - prefix: string, only consume hashes that start with `prefix`. - - Returns: - nothing: this is the consumer main process - """ + """Find a redis key and process it""" start = timeit.default_timer() redis_hash = self.get_redis_hash() if redis_hash is not None: # popped something off the queue try: self._consume(redis_hash) - hvals = self.redis.hgetall(redis_hash) + except Exception as err: # pylint: disable=broad-except + # log the error and update redis with details + self._handle_error(err, redis_hash) + + hvals = self.redis.hgetall(redis_hash) + if hvals.get('status') == self.final_status: self.logger.debug('Consumed key %s (model %s:%s, ' 'preprocessing: %s, postprocessing: %s) ' '(%s retries) in %s seconds.', @@ -161,12 +159,13 @@ def consume(self): hvals.get('preprocess_function'), hvals.get('postprocess_function'), 0, timeit.default_timer() - start) - except Exception as err: # pylint: disable=broad-except - # log the error and update redis with details - self._handle_error(err, redis_hash) - # remove the key from the processing queue - self.redis.lrem(self.processing_queue, 1, redis_hash) + # this key is done. remove the key from the processing queue. + self.redis.lrem(self.processing_queue, 1, redis_hash) + else: + # this key is not done yet. + # remove it from processing and push it back to the work queue. + self._put_back_hash(redis_hash) else: self.logger.debug('Queue `%s` is empty. Waiting for %s seconds.', @@ -552,6 +551,16 @@ def _upload_archived_images(self, hvalues): new_hvals['created_at'] = current_timestamp new_hvals['updated_at'] = current_timestamp + # remove unnecessary/confusing keys (maybe from getting restarted) + bad_keys = [ + 'identity_started', + 'children', + 'children:done', + 'children:finished', + ] + for k in bad_keys: + if k in new_hvals: + del new_hvals[k] self.logger.debug('Added new hash `%s`: %s', new_hash, json.dumps(new_hvals, indent=4)) all_hashes.add(new_hash) @@ -635,9 +644,6 @@ def _consume(self, redis_hash): 'children:failed': '', # empty for now }) - # remove it from processing, and push it back to the work queue. - self._put_back_hash(redis_hash) - elif hvals['status'] == 'waiting': # this key was previously processed by a ZipConsumer # check to see which child keys have been processed @@ -660,8 +666,6 @@ def _consume(self, redis_hash): 'children:done': key_separator.join(done), 'children:failed': key_separator.join(failed), }) - # remove it from processing, and push it back to the work queue. - self._put_back_hash(redis_hash) elif hvals['status'] == 'cleanup': # clean up children with status `done` From 6aa8df0e7faf436d022195bca5b78c1bad6b96af Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Fri, 10 May 2019 17:01:24 -0700 Subject: [PATCH 05/34] somehow removed the adding the key to redis --- redis_consumer/consumers.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index aec9abb0..de3a1bc7 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -561,6 +561,9 @@ def _upload_archived_images(self, hvalues): for k in bad_keys: if k in new_hvals: del new_hvals[k] + + self.redis.hmset(new_hash, new_hvals) + self.redis.lpush(self.queue, new_hash) self.logger.debug('Added new hash `%s`: %s', new_hash, json.dumps(new_hvals, indent=4)) all_hashes.add(new_hash) From e3ec479864e6019ef733827dc8bab519ddc930bc Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Mon, 13 May 2019 12:52:42 -0700 Subject: [PATCH 06/34] update zipconsumer to allowzip64 and write in wb mode --- redis_consumer/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis_consumer/utils.py b/redis_consumer/utils.py index e1d58a4b..27727bd4 100644 --- a/redis_consumer/utils.py +++ b/redis_consumer/utils.py @@ -160,7 +160,7 @@ def iter_image_archive(zip_path, destination): Returns: Iterator of all image paths in extracted archive """ - archive = zipfile.ZipFile(zip_path, 'r') + archive = zipfile.ZipFile(zip_path, 'r', allowZip64=True) is_valid = lambda x: os.path.splitext(x)[1] and '__MACOSX' not in x for info in archive.infolist(): extracted = archive.extract(info, path=destination) @@ -298,7 +298,7 @@ def zip_files(files, dest=None, prefix=None): try: logger.debug('Saving %s files to %s', len(files), filepath) - with zipfile.ZipFile(filepath, 'w') as zip_file: + with zipfile.ZipFile(filepath, 'wb', allowZip64=True) as zip_file: for f in files: # writing each file one by one name = f.replace(dest, '') name = name[1:] if name.startswith(os.path.sep) else name From 307c5f53e0653141695bffce2929f57076549d82 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Mon, 13 May 2019 12:53:01 -0700 Subject: [PATCH 07/34] convert tests to use ":" as separator instead of "_" --- redis_consumer/consumers_test.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/redis_consumer/consumers_test.py b/redis_consumer/consumers_test.py index 55a87030..f0da221f 100644 --- a/redis_consumer/consumers_test.py +++ b/redis_consumer/consumers_test.py @@ -85,7 +85,7 @@ def scan_iter(self, match=None, count=None): def expected_keys(self, suffix=None): for k in self.keys: - v = k.split('_') + v = k.split(':') if v[0] == self.prefix: if v[1] == self.status: if suffix: @@ -102,13 +102,13 @@ def expire(self, name, time): # pylint: disable=W0613 def hget(self, rhash, field): if field == 'status': - return rhash.split('_')[1] + return rhash.split(':')[1] elif field == 'file_name': - return rhash.split('_')[-1] + return rhash.split(':')[-1] elif field == 'input_file_name': - return rhash.split('_')[-1] + return rhash.split(':')[-1] elif field == 'output_file_name': - return rhash.split('_')[-1] + return rhash.split(':')[-1] return False def hset(self, rhash, status, value): # pylint: disable=W0613 @@ -122,9 +122,9 @@ def hgetall(self, rhash): # pylint: disable=W0613 'cuts': '0', 'postprocess_function': '', 'preprocess_function': '', - 'file_name': rhash.split('_')[-1], - 'input_file_name': rhash.split('_')[-1], - 'output_file_name': rhash.split('_')[-1] + 'file_name': rhash.split(':')[-1], + 'input_file_name': rhash.split(':')[-1], + 'output_file_name': rhash.split(':')[-1] } @@ -352,14 +352,14 @@ def test__consume(self): hget = lambda h, k: 'done' if k == 'status' else _redis.hget(h, k) redis_client.hget = hget consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - dummyhash = '{}_test.zip'.format(prefix) + dummyhash = '{}:test.zip'.format(prefix) consumer._consume(dummyhash) # test `status` = "failed" hget = lambda h, k: 'failed' if k == 'status' else _redis.hget(h, k) redis_client.hget = hget consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - dummyhash = '{}_test.zip'.format(prefix) + dummyhash = '{}:test.zip'.format(prefix) consumer._consume(dummyhash) # test mixed `status` = "waiting" and "done" @@ -376,5 +376,5 @@ def hget_wait(h, k): redis_client.hget = hget_wait consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - dummyhash = '{}_test.zip'.format(prefix) + dummyhash = '{}:test.zip'.format(prefix) consumer._consume(dummyhash) From 52ecc89d9adee08dd0da94519643a80f4e8e7809 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Mon, 13 May 2019 12:53:19 -0700 Subject: [PATCH 08/34] call dict.get for status key for safety --- redis_consumer/consumers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index de3a1bc7..140b5dc3 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -629,7 +629,7 @@ def _consume(self, redis_hash): key_separator = ',' # char to separate child keys in Redis expire_time = 60 * 10 # expire finished child keys in ten minutes - if hvals['status'] == 'new': + if hvals.get('status') == 'new': # download the zip file, upload the contents, and enter into Redis self.update_status(redis_hash, 'started', { 'identity_started': self.hostname, @@ -647,7 +647,7 @@ def _consume(self, redis_hash): 'children:failed': '', # empty for now }) - elif hvals['status'] == 'waiting': + elif hvals.get('status') == 'waiting': # this key was previously processed by a ZipConsumer # check to see which child keys have been processed children = set(hvals.get('children', '').split(key_separator)) @@ -670,7 +670,7 @@ def _consume(self, redis_hash): 'children:failed': key_separator.join(failed), }) - elif hvals['status'] == 'cleanup': + elif hvals.get('status') == 'cleanup': # clean up children with status `done` done = hvals['children:done'].split(key_separator) uploaded_file_path, output_url = self._upload_finished_children( From 201888dd5c4d60ccd7d868464732e9d50ceb9368 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Mon, 13 May 2019 13:19:56 -0700 Subject: [PATCH 09/34] w not wb --- redis_consumer/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis_consumer/utils.py b/redis_consumer/utils.py index 27727bd4..2a2a6639 100644 --- a/redis_consumer/utils.py +++ b/redis_consumer/utils.py @@ -298,7 +298,7 @@ def zip_files(files, dest=None, prefix=None): try: logger.debug('Saving %s files to %s', len(files), filepath) - with zipfile.ZipFile(filepath, 'wb', allowZip64=True) as zip_file: + with zipfile.ZipFile(filepath, 'w', allowZip64=True) as zip_file: for f in files: # writing each file one by one name = f.replace(dest, '') name = name[1:] if name.startswith(os.path.sep) else name From 4290cb2169f0a77c93d3bf7d0518842c80aa7282 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Mon, 13 May 2019 14:59:14 -0700 Subject: [PATCH 10/34] use dict.get for less crashing? --- redis_consumer/consumers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 140b5dc3..9e0390fe 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -672,12 +672,12 @@ def _consume(self, redis_hash): elif hvals.get('status') == 'cleanup': # clean up children with status `done` - done = hvals['children:done'].split(key_separator) + done = hvals.get('children:done', '').split(key_separator) uploaded_file_path, output_url = self._upload_finished_children( done, expire_time) # clean up children with status `failed` - failed = hvals['children:failed'].split(key_separator) + failed = hvals.get('children:failed', '').split(key_separator) failures = self._parse_failures(failed, expire_time) # Update redis with the results From 8fe8715fb4ddc376dbf665452e014f868ebabeca Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Mon, 13 May 2019 15:58:05 -0700 Subject: [PATCH 11/34] update tests for zip consumer --- redis_consumer/consumers_test.py | 86 +++++++++++++++++++++++--------- 1 file changed, 62 insertions(+), 24 deletions(-) diff --git a/redis_consumer/consumers_test.py b/redis_consumer/consumers_test.py index f0da221f..6f2bb4bd 100644 --- a/redis_consumer/consumers_test.py +++ b/redis_consumer/consumers_test.py @@ -109,6 +109,8 @@ def hget(self, rhash, field): return rhash.split(':')[-1] elif field == 'output_file_name': return rhash.split(':')[-1] + elif field == 'reason': + return 'reason' return False def hset(self, rhash, status, value): # pylint: disable=W0613 @@ -124,7 +126,11 @@ def hgetall(self, rhash): # pylint: disable=W0613 'preprocess_function': '', 'file_name': rhash.split(':')[-1], 'input_file_name': rhash.split(':')[-1], - 'output_file_name': rhash.split(':')[-1] + 'output_file_name': rhash.split(':')[-1], + 'status': rhash.split(':')[1], + 'children': 'predict:done:1.tiff,predict:failed:2.tiff,predict:new:3.tiff', + 'children:done': 'predict:done:4.tiff,predict:done:5.tiff', + 'children:failed': 'predict:failed:6.tiff,predict:failed:7.tiff', } @@ -215,7 +221,7 @@ def hmset(self, _, hvals): assert _redis_values.get('status') == 'failed' def test_consume(self): - items = ['item%s' % x for x in range(1, 4)] + items = ['{}:{}:{}.tiff'.format('predict', 'new', x) for x in range(1, 4)] N = 1 # using a queue, only one key is processed per consume() consumer = consumers.Consumer(DummyRedis(items), DummyStorage(), 'q') @@ -240,7 +246,7 @@ def F(*_): def test__consume(self): with np.testing.assert_raises(NotImplementedError): consumer = consumers.Consumer(None, None, 'q') - consumer._consume('hash') + consumer._consume('predict:new:hash.tiff') class TestImageFileConsumer(object): @@ -288,7 +294,7 @@ def _handle_error(err, rhash): # pylint: disable=W0613 def grpc_image_multi(data, *args, **kwargs): # pylint: disable=W0613 return np.array(tuple(list(data.shape) + [2])) - dummyhash = '{}_test.tiff'.format(prefix) + dummyhash = '{}:{}:test.tiff'.format(prefix, status) # consumer._handle_error = _handle_error consumer.grpc_image = grpc_image_multi @@ -340,6 +346,32 @@ def test__upload_archived_images(self): hsh = consumer._upload_archived_images({'input_file_name': 'test.zip'}) assert len(hsh) == N + def test__upload_finished_children(self): + finished_children = ['predict:1.tiff', 'predict:2.zip'] + N = 3 + items = ['item%s' % x for x in range(1, N + 1)] + redis_client = DummyRedis(items) + storage = DummyStorage(num=N) + consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') + path, url = consumer._upload_finished_children(finished_children) + assert path and url + + def test__parse_failures(self): + N = 3 + items = ['item%s' % x for x in range(1, N + 1)] + redis_client = DummyRedis(items) + storage = DummyStorage(num=N) + consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') + + # no failures + failed_children = '' + parsed = consumer._parse_failures(failed_children) + assert parsed == '' + + failed_children = ['item1', 'item2'] + parsed = consumer._parse_failures(failed_children) + assert parsed == 'item1=reason&item2=reason' + def test__consume(self): N = 3 prefix = 'predict' @@ -348,33 +380,39 @@ def test__consume(self): redis_client = DummyRedis(items) storage = DummyStorage(num=N) - # test `status` = "done" - hget = lambda h, k: 'done' if k == 'status' else _redis.hget(h, k) - redis_client.hget = hget + # test `status` = "new" + status = 'new' consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - dummyhash = '{}:test.zip'.format(prefix) + consumer._upload_archived_images = lambda x: items + dummyhash = '{queue}:{status}:{fname}.zip'.format( + queue=prefix, status=status, fname=status) consumer._consume(dummyhash) - # test `status` = "failed" - hget = lambda h, k: 'failed' if k == 'status' else _redis.hget(h, k) - redis_client.hget = hget + # test `status` = "waiting" + status = 'waiting' consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - dummyhash = '{}:test.zip'.format(prefix) + dummyhash = '{queue}:{status}:{fname}.zip'.format( + queue=prefix, status=status, fname=status) consumer._consume(dummyhash) - # test mixed `status` = "waiting" and "done" - global counter - counter = 0 + # test `status` = "cleanup" + status = 'cleanup' + consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') + consumer._upload_finished_children = lambda x, y: (x, y) + dummyhash = '{queue}:{status}:{fname}.zip'.format( + queue=prefix, status=status, fname=status) + consumer._consume(dummyhash) - def hget_wait(h, k): - if k == 'status': - global counter - status = 'waiting' if counter % 2 == 0 else 'done' - counter += 1 - return status - return _redis.hget(h, k) + # test `status` = "done" + status = 'done' + consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') + dummyhash = '{queue}:{status}:{fname}.zip'.format( + queue=prefix, status=status, fname=status) + consumer._consume(dummyhash) - redis_client.hget = hget_wait + # test `status` = "failed" + status = 'failed' consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - dummyhash = '{}:test.zip'.format(prefix) + dummyhash = '{queue}:{status}:{fname}.zip'.format( + queue=prefix, status=status, fname=status) consumer._consume(dummyhash) From efa8cd65286d8a9a38684ca1cab69a6acd7d8621 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Mon, 13 May 2019 16:29:43 -0700 Subject: [PATCH 12/34] all_hashes only exists on the `new` block. --- redis_consumer/consumers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 9e0390fe..2aa21d8e 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -689,5 +689,6 @@ def _consume(self, redis_hash): 'output_file_name': uploaded_file_path }) self.logger.info('Processed all %s images of zipfile `%s` in %s', - len(all_hashes), hvals['input_file_name'], + len(hvals.get('children', [])), + hvals.get('input_file_name'), timeit.default_timer() - start) From 5563f22553ecf2e73b9cd0bedc4337c2612f125c Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Mon, 13 May 2019 16:51:05 -0700 Subject: [PATCH 13/34] urlencoding does not preserve order --- redis_consumer/consumers_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis_consumer/consumers_test.py b/redis_consumer/consumers_test.py index 6f2bb4bd..4a91fcb8 100644 --- a/redis_consumer/consumers_test.py +++ b/redis_consumer/consumers_test.py @@ -370,7 +370,7 @@ def test__parse_failures(self): failed_children = ['item1', 'item2'] parsed = consumer._parse_failures(failed_children) - assert parsed == 'item1=reason&item2=reason' + assert 'item1=reason' in parsed and 'item2=reason' in parsed def test__consume(self): N = 3 From 0bf296756c287fef6a137c9cbe124db4b89daee9 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Mon, 13 May 2019 16:52:19 -0700 Subject: [PATCH 14/34] minor cleanup --- redis_consumer/consumers_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/redis_consumer/consumers_test.py b/redis_consumer/consumers_test.py index 4a91fcb8..27fdf790 100644 --- a/redis_consumer/consumers_test.py +++ b/redis_consumer/consumers_test.py @@ -353,7 +353,7 @@ def test__upload_finished_children(self): redis_client = DummyRedis(items) storage = DummyStorage(num=N) consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - path, url = consumer._upload_finished_children(finished_children) + path, url = consumer._upload_finished_children(finished_children, 0) assert path and url def test__parse_failures(self): @@ -376,7 +376,6 @@ def test__consume(self): N = 3 prefix = 'predict' items = ['item%s' % x for x in range(1, 4)] - _redis = DummyRedis(items) redis_client = DummyRedis(items) storage = DummyStorage(num=N) From 63595c017a2b16b668a27fa67664a8c4b51f6ba1 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Mon, 13 May 2019 23:20:00 -0700 Subject: [PATCH 15/34] remove empty strings from children before adding to redis --- redis_consumer/consumers.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 2aa21d8e..d4fad143 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -179,8 +179,8 @@ class ImageFileConsumer(Consumer): def is_valid_hash(self, redis_hash): if redis_hash is None: return False - fname = str(self.redis.hget(redis_hash, 'input_file_name')) - is_valid = not fname.lower().endswith('.zip') + fname = self.redis.hget(redis_hash, 'input_file_name') + is_valid = not str(fname).lower().endswith('.zip') return is_valid def _process(self, image, key, process_type, timeout=30, streaming=False): @@ -520,8 +520,8 @@ class ZipFileConsumer(Consumer): def is_valid_hash(self, redis_hash): if redis_hash is None: return False - fname = str(self.redis.hget(redis_hash, 'input_file_name')) - is_valid = fname.lower().endswith('.zip') + fname = self.redis.hget(redis_hash, 'input_file_name') + is_valid = str(fname).lower().endswith('.zip') return is_valid def _upload_archived_images(self, hvalues): @@ -666,18 +666,18 @@ def _consume(self, redis_hash): # if there are no remaining children, update status to cleanup status = 'cleanup' if not children - done - failed else 'waiting' self.update_status(redis_hash, status, { - 'children:done': key_separator.join(done), - 'children:failed': key_separator.join(failed), + 'children:done': key_separator.join(d for d in done if d), + 'children:failed': key_separator.join(f for f in failed if f), }) elif hvals.get('status') == 'cleanup': - # clean up children with status `done` - done = hvals.get('children:done', '').split(key_separator) - uploaded_file_path, output_url = self._upload_finished_children( + # clean up children with status `done` and `failed` + done = set(hvals.get('children:done', '').split(key_separator)) + failed = set(hvals.get('children:failed', '').split(key_separator)) + + output_file_name, output_url = self._upload_finished_children( done, expire_time) - # clean up children with status `failed` - failed = hvals.get('children:failed', '').split(key_separator) failures = self._parse_failures(failed, expire_time) # Update redis with the results @@ -686,7 +686,7 @@ def _consume(self, redis_hash): 'finished_at': self.get_current_timestamp(), 'output_url': output_url, 'failures': failures, - 'output_file_name': uploaded_file_path + 'output_file_name': output_file_name }) self.logger.info('Processed all %s images of zipfile `%s` in %s', len(hvals.get('children', [])), From 5465131e361f54c98bdd5586758b885ce92f8821 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Tue, 14 May 2019 09:51:40 -0700 Subject: [PATCH 16/34] update last log line --- redis_consumer/consumers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index d4fad143..01981f8e 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -672,6 +672,7 @@ def _consume(self, redis_hash): elif hvals.get('status') == 'cleanup': # clean up children with status `done` and `failed` + children = set(hvals.get('children', '').split(key_separator)) done = set(hvals.get('children:done', '').split(key_separator)) failed = set(hvals.get('children:failed', '').split(key_separator)) @@ -689,6 +690,5 @@ def _consume(self, redis_hash): 'output_file_name': output_file_name }) self.logger.info('Processed all %s images of zipfile `%s` in %s', - len(hvals.get('children', [])), - hvals.get('input_file_name'), + len(children), hvals.get('input_file_name'), timeit.default_timer() - start) From cb7b744f4f2ab4df386ed60d303f89768a6c20dd Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Tue, 14 May 2019 11:13:40 -0700 Subject: [PATCH 17/34] ensure empty keys are not processed --- redis_consumer/consumers.py | 4 ++++ redis_consumer/consumers_test.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 01981f8e..e8e47f44 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -574,6 +574,8 @@ def _upload_finished_children(self, finished_children, expire_time=3600): with utils.get_tempdir() as tempdir: # process each successfully completed key for key in finished_children: + if not key: + continue fname = self.redis.hget(key, 'output_file_name') local_fname = self.storage.download(fname, tempdir) @@ -601,6 +603,8 @@ def _upload_finished_children(self, finished_children, expire_time=3600): def _parse_failures(self, failed_children, expire_time=3600): failed_hashes = {} for key in failed_children: + if not key: + continue reason = self.redis.hget(key, 'reason') # one of the hashes failed to process self.logger.error('Failed to process hash `%s`: %s', diff --git a/redis_consumer/consumers_test.py b/redis_consumer/consumers_test.py index 27fdf790..7b630d1c 100644 --- a/redis_consumer/consumers_test.py +++ b/redis_consumer/consumers_test.py @@ -347,7 +347,7 @@ def test__upload_archived_images(self): assert len(hsh) == N def test__upload_finished_children(self): - finished_children = ['predict:1.tiff', 'predict:2.zip'] + finished_children = ['predict:1.tiff', 'predict:2.zip', ''] N = 3 items = ['item%s' % x for x in range(1, N + 1)] redis_client = DummyRedis(items) @@ -368,7 +368,7 @@ def test__parse_failures(self): parsed = consumer._parse_failures(failed_children) assert parsed == '' - failed_children = ['item1', 'item2'] + failed_children = ['item1', 'item2', ''] parsed = consumer._parse_failures(failed_children) assert 'item1=reason' in parsed and 'item2=reason' in parsed From da8f5fed9d484fd8b5e9bc590d0313f2e29c8315 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Tue, 14 May 2019 11:19:14 -0700 Subject: [PATCH 18/34] update bad keys --- redis_consumer/consumers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index e8e47f44..c7c11e5b 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -553,10 +553,10 @@ def _upload_archived_images(self, hvalues): # remove unnecessary/confusing keys (maybe from getting restarted) bad_keys = [ - 'identity_started', 'children', 'children:done', - 'children:finished', + 'children:failed', + 'identity_started', ] for k in bad_keys: if k in new_hvals: From 057ce5e05743fd25a20fe894463f97e4cda298ee Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Tue, 14 May 2019 12:18:39 -0700 Subject: [PATCH 19/34] improved logging --- redis_consumer/consumers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index c7c11e5b..67f50483 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -640,7 +640,7 @@ def _consume(self, redis_hash): }) all_hashes = self._upload_archived_images(hvals) - self.logger.info('Uploaded %s hashes. Waiting for ImageConsumers.', + self.logger.info('Uploaded %s hashes. Waiting for ImageConsumers.', len(all_hashes)) # Now all images have been uploaded with new redis hashes @@ -667,6 +667,9 @@ def _consume(self, redis_hash): elif status == self.final_status: done.add(child) + self.logger.info('Key `%s` has %s children waiting for processing', + redis_hash, len(children - done - failed)) + # if there are no remaining children, update status to cleanup status = 'cleanup' if not children - done - failed else 'waiting' self.update_status(redis_hash, status, { From dcdd86c15fb334207c58079d0c928a423152838f Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Tue, 14 May 2019 16:03:28 -0700 Subject: [PATCH 20/34] still better logging --- redis_consumer/consumers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 67f50483..9de3c975 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -640,8 +640,8 @@ def _consume(self, redis_hash): }) all_hashes = self._upload_archived_images(hvals) - self.logger.info('Uploaded %s hashes. Waiting for ImageConsumers.', - len(all_hashes)) + self.logger.info('Uploaded %s child keys for key `%s`. Waiting for' + ' ImageConsumers.', len(all_hashes), redis_hash) # Now all images have been uploaded with new redis hashes # Update Redis with child keys and put item back in queue From dc501af2f88241021fa268d9b0c877ed822cd9ff Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Tue, 14 May 2019 16:21:06 -0700 Subject: [PATCH 21/34] update hash formatting --- redis_consumer/consumers_test.py | 66 ++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/redis_consumer/consumers_test.py b/redis_consumer/consumers_test.py index 7b630d1c..b6237a7f 100644 --- a/redis_consumer/consumers_test.py +++ b/redis_consumer/consumers_test.py @@ -30,6 +30,8 @@ import os import copy +import math +import random import redis import numpy as np @@ -55,12 +57,12 @@ def __init__(self, items=[], prefix='predict', status='new'): self.prefix = '/'.join(x for x in prefix.split('/') if x) self.status = status self.keys = [ - '{}_{}_{}'.format(self.prefix, self.status, 'x.tiff'), - '{}_{}_{}'.format(self.prefix, 'other', 'x.zip'), - '{}_{}_{}'.format('other', self.status, 'x.TIFF'), - '{}_{}_{}'.format(self.prefix, self.status, 'x.ZIP'), - '{}_{}_{}'.format(self.prefix, 'other', 'x.tiff'), - '{}_{}_{}'.format('other', self.status, 'x.zip'), + '{}:{}:{}'.format(self.prefix, 'x.tiff', self.status), + '{}:{}:{}'.format(self.prefix, 'x.zip', 'other'), + '{}:{}:{}'.format('other', 'x.TIFF', self.status), + '{}:{}:{}'.format(self.prefix, 'x.ZIP', self.status), + '{}:{}:{}'.format(self.prefix, 'x.tiff', 'other'), + '{}:{}:{}'.format('other', 'x.zip', self.status), ] def rpoplpush(self, src, dst): @@ -102,13 +104,13 @@ def expire(self, name, time): # pylint: disable=W0613 def hget(self, rhash, field): if field == 'status': - return rhash.split(':')[1] - elif field == 'file_name': return rhash.split(':')[-1] + elif field == 'file_name': + return rhash.split(':')[1] elif field == 'input_file_name': - return rhash.split(':')[-1] + return rhash.split(':')[1] elif field == 'output_file_name': - return rhash.split(':')[-1] + return rhash.split(':')[1] elif field == 'reason': return 'reason' return False @@ -124,10 +126,10 @@ def hgetall(self, rhash): # pylint: disable=W0613 'cuts': '0', 'postprocess_function': '', 'preprocess_function': '', - 'file_name': rhash.split(':')[-1], - 'input_file_name': rhash.split(':')[-1], - 'output_file_name': rhash.split(':')[-1], - 'status': rhash.split(':')[1], + 'file_name': rhash.split(':')[1], + 'input_file_name': rhash.split(':')[1], + 'output_file_name': rhash.split(':')[1], + 'status': rhash.split(':')[-1], 'children': 'predict:done:1.tiff,predict:failed:2.tiff,predict:new:3.tiff', 'children:done': 'predict:done:4.tiff,predict:done:5.tiff', 'children:failed': 'predict:failed:6.tiff,predict:failed:7.tiff', @@ -294,7 +296,7 @@ def _handle_error(err, rhash): # pylint: disable=W0613 def grpc_image_multi(data, *args, **kwargs): # pylint: disable=W0613 return np.array(tuple(list(data.shape) + [2])) - dummyhash = '{}:{}:test.tiff'.format(prefix, status) + dummyhash = '{}:test.tiff:{}'.format(prefix, status) # consumer._handle_error = _handle_error consumer.grpc_image = grpc_image_multi @@ -415,3 +417,37 @@ def test__consume(self): dummyhash = '{queue}:{status}:{fname}.zip'.format( queue=prefix, status=status, fname=status) consumer._consume(dummyhash) + + def test_consume(self): + prefix = 'predict' + items = [ + '{queue}:f.zip:{status}'.format(queue=prefix, status='new'), + '{queue}:e.zip:{status}'.format(queue=prefix, status='waiting'), + '{queue}:d.zip:{status}'.format(queue=prefix, status='cleanup'), + '{queue}:c.zip:{status}'.format(queue=prefix, status='done'), + '{queue}:b.zip:{status}'.format(queue=prefix, status='failed'), + '{queue}:a.tiff:{status}'.format(queue=prefix, status='new'), + ] + redis_client = DummyRedis(items) + storage = DummyStorage(num=len(items)) + consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') + + global put_back_counter + put_back_counter = 0 + + def _put_back_hash(redis_hash): + _consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') + _consumer._put_back_hash(redis_hash) + global put_back_counter + put_back_counter = put_back_counter + 1 + + consumer._put_back_hash = _put_back_hash + consumer._upload_finished_children = lambda x, y: (x, y) + consumer._upload_archived_images = lambda x: items + + # searches items from end to start, extra put_back every len(items) - 1 + num_invalid = 1 # 1 file that is not a zip file + N = random.randint(0, len(items) * 3 + 1) + for _ in range(N): + consumer.consume() + assert put_back_counter == N + math.ceil(N / (len(items) - num_invalid)) From ddfec49aa3ce45cc010c3ed2cd72aef81316efb0 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Wed, 15 May 2019 12:29:18 -0700 Subject: [PATCH 22/34] minor logic cleanup --- redis_consumer/consumers.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 9de3c975..d2f039ec 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -646,9 +646,7 @@ def _consume(self, redis_hash): # Now all images have been uploaded with new redis hashes # Update Redis with child keys and put item back in queue self.update_status(redis_hash, 'waiting', { - 'children': key_separator.join(all_hashes), - 'children:done': '', # empty for now - 'children:failed': '', # empty for now + 'children': key_separator.join(all_hashes) }) elif hvals.get('status') == 'waiting': @@ -667,11 +665,13 @@ def _consume(self, redis_hash): elif status == self.final_status: done.add(child) + remaining_children = children - done - failed + self.logger.info('Key `%s` has %s children waiting for processing', - redis_hash, len(children - done - failed)) + redis_hash, len(remaining_children)) # if there are no remaining children, update status to cleanup - status = 'cleanup' if not children - done - failed else 'waiting' + status = 'cleanup' if not remaining_children else 'waiting' self.update_status(redis_hash, status, { 'children:done': key_separator.join(d for d in done if d), 'children:failed': key_separator.join(f for f in failed if f), From 39ad623d6d9cf747feb8d0262983ea56abe55b5e Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Wed, 15 May 2019 12:29:32 -0700 Subject: [PATCH 23/34] don't need to putback failed hashes. --- redis_consumer/consumers.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index d2f039ec..f73eebef 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -159,9 +159,11 @@ def consume(self): hvals.get('preprocess_function'), hvals.get('postprocess_function'), 0, timeit.default_timer() - start) - # this key is done. remove the key from the processing queue. self.redis.lrem(self.processing_queue, 1, redis_hash) + elif hvals.get('status') == 'failed': + # the key failed, remove it from the processing queue + self.redis.lrem(self.processing_queue, 1, redis_hash) else: # this key is not done yet. # remove it from processing and push it back to the work queue. From cac21f03ea2797df925c28ff2aafbb851cb4f53c Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Wed, 15 May 2019 12:29:55 -0700 Subject: [PATCH 24/34] reorder imports --- redis_consumer/consumers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index f73eebef..552b8c44 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -34,9 +34,9 @@ import uuid import urllib import timeit -import datetime import logging import zipfile +import datetime import pytz import grpc From 075a0e351d83454f46c4232fd7063ca29d9e6c31 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Wed, 15 May 2019 15:55:39 -0700 Subject: [PATCH 25/34] improve dummyredis return values --- redis_consumer/consumers_test.py | 78 ++++++++++++++------------------ 1 file changed, 35 insertions(+), 43 deletions(-) diff --git a/redis_consumer/consumers_test.py b/redis_consumer/consumers_test.py index b6237a7f..94964a0c 100644 --- a/redis_consumer/consumers_test.py +++ b/redis_consumer/consumers_test.py @@ -41,6 +41,7 @@ from redis_consumer import consumers from redis_consumer import utils +from redis_consumer import settings def _get_image(img_h=300, img_w=300): @@ -75,10 +76,11 @@ def rpoplpush(self, src, dst): def lpush(self, name, *values): self.work_queue = list(values) + self.work_queue - return len(values) + return len(self.work_queue) def lrem(self, name, count, value): self.processing_queue.remove(value) + return count def scan_iter(self, match=None, count=None): if match: @@ -130,9 +132,9 @@ def hgetall(self, rhash): # pylint: disable=W0613 'input_file_name': rhash.split(':')[1], 'output_file_name': rhash.split(':')[1], 'status': rhash.split(':')[-1], - 'children': 'predict:done:1.tiff,predict:failed:2.tiff,predict:new:3.tiff', - 'children:done': 'predict:done:4.tiff,predict:done:5.tiff', - 'children:failed': 'predict:failed:6.tiff,predict:failed:7.tiff', + 'children': 'predict:1.tiff:done,predict:2.tiff:failed,predict:3.tiff:new', + 'children:done': 'predict:4.tiff:done,predict:5.tiff:done', + 'children:failed': 'predict:6.tiff:failed,predict:7.tiff:failed', } @@ -245,6 +247,30 @@ def F(*_): consumer.consume() assert _processed == N + 1 + # empty redis queue + consumer.get_redis_hash = lambda: None + settings.EMPTY_QUEUE_TIMEOUT = 0.1 # don't sleep too long + consumer.consume() + + # failed and done statuses call lrem + def lrem(key, count, value): + global _processed + _processed = True + + _processed = False + redis_client = DummyRedis(items) + redis_client.lrem = lrem + consumer = consumers.Consumer(redis_client, DummyStorage(), 'q') + consumer.get_redis_hash = lambda: 'predict:f.tiff:failed' + consumer.consume() + assert _processed is True + + _processed = False + consumer.get_redis_hash = lambda: 'predict:f.tiff:{status}'.format( + status=consumer.final_status) + consumer.consume() + assert _processed is True + def test__consume(self): with np.testing.assert_raises(NotImplementedError): consumer = consumers.Consumer(None, None, 'q') @@ -385,14 +411,14 @@ def test__consume(self): status = 'new' consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') consumer._upload_archived_images = lambda x: items - dummyhash = '{queue}:{status}:{fname}.zip'.format( + dummyhash = '{queue}:{fname}.zip:{status}'.format( queue=prefix, status=status, fname=status) consumer._consume(dummyhash) # test `status` = "waiting" status = 'waiting' consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - dummyhash = '{queue}:{status}:{fname}.zip'.format( + dummyhash = '{queue}:{fname}.zip:{status}'.format( queue=prefix, status=status, fname=status) consumer._consume(dummyhash) @@ -400,54 +426,20 @@ def test__consume(self): status = 'cleanup' consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') consumer._upload_finished_children = lambda x, y: (x, y) - dummyhash = '{queue}:{status}:{fname}.zip'.format( + dummyhash = '{queue}:{fname}.zip:{status}'.format( queue=prefix, status=status, fname=status) consumer._consume(dummyhash) # test `status` = "done" status = 'done' consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - dummyhash = '{queue}:{status}:{fname}.zip'.format( + dummyhash = '{queue}:{fname}.zip:{status}'.format( queue=prefix, status=status, fname=status) consumer._consume(dummyhash) # test `status` = "failed" status = 'failed' consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - dummyhash = '{queue}:{status}:{fname}.zip'.format( + dummyhash = '{queue}:{fname}.zip:{status}'.format( queue=prefix, status=status, fname=status) consumer._consume(dummyhash) - - def test_consume(self): - prefix = 'predict' - items = [ - '{queue}:f.zip:{status}'.format(queue=prefix, status='new'), - '{queue}:e.zip:{status}'.format(queue=prefix, status='waiting'), - '{queue}:d.zip:{status}'.format(queue=prefix, status='cleanup'), - '{queue}:c.zip:{status}'.format(queue=prefix, status='done'), - '{queue}:b.zip:{status}'.format(queue=prefix, status='failed'), - '{queue}:a.tiff:{status}'.format(queue=prefix, status='new'), - ] - redis_client = DummyRedis(items) - storage = DummyStorage(num=len(items)) - consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - - global put_back_counter - put_back_counter = 0 - - def _put_back_hash(redis_hash): - _consumer = consumers.ZipFileConsumer(redis_client, storage, 'q') - _consumer._put_back_hash(redis_hash) - global put_back_counter - put_back_counter = put_back_counter + 1 - - consumer._put_back_hash = _put_back_hash - consumer._upload_finished_children = lambda x, y: (x, y) - consumer._upload_archived_images = lambda x: items - - # searches items from end to start, extra put_back every len(items) - 1 - num_invalid = 1 # 1 file that is not a zip file - N = random.randint(0, len(items) * 3 + 1) - for _ in range(N): - consumer.consume() - assert put_back_counter == N + math.ceil(N / (len(items) - num_invalid)) From c8dbdddcaa927a1d2c3644a6e29a1bcf012a9b99 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Wed, 15 May 2019 15:55:51 -0700 Subject: [PATCH 26/34] graceful exit --- consume-redis-events.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/consume-redis-events.py b/consume-redis-events.py index a92fa566..2e1b313b 100644 --- a/consume-redis-events.py +++ b/consume-redis-events.py @@ -32,13 +32,32 @@ from __future__ import print_function import sys +import signal import traceback import logging import logging.handlers import redis_consumer from redis_consumer import settings -from redis_consumer import storage + + +class GracefulDeath: + """Catch signals to allow graceful shutdown. + + Adapted from: https://stackoverflow.com/questions/18499497 + """ + + def __init__(self): + self.signum = None + self.kill_now = False + self.logger = logging.getLogger(str(self.__class__.__name__)) + signal.signal(signal.SIGINT, self.handle_signal) + signal.signal(signal.SIGTERM, self.handle_signal) + + def handle_signal(self, signum, frame): # pylint: disable=unused-argument + self.signum = signum + self.kill_now = True + self.logger.debug('Received signal `%s` and frame `%s`', signum, frame) def initialize_logger(debug_mode=True): @@ -76,6 +95,7 @@ def get_consumer(consumer_type, **kwargs): if __name__ == '__main__': initialize_logger(settings.DEBUG) + sighandler = GracefulDeath() _logger = logging.getLogger(__file__) @@ -98,8 +118,12 @@ def get_consumer(consumer_type, **kwargs): while True: try: consumer.consume() + if sighandler.kill_now: + break except Exception as err: # pylint: disable=broad-except _logger.critical('Fatal Error: %s: %s\n%s', type(err).__name__, err, traceback.format_exc()) sys.exit(1) + + _logger.info('Gracefully exited after signal number %s', sighandler.signum) From 3f0fb4f2f5bab23a1385268a4ece1711d5765c0e Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Wed, 15 May 2019 15:56:13 -0700 Subject: [PATCH 27/34] if status is done or failed, use lrem instead of put_back --- redis_consumer/consumers.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 552b8c44..9ca7a7f4 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -159,11 +159,10 @@ def consume(self): hvals.get('preprocess_function'), hvals.get('postprocess_function'), 0, timeit.default_timer() - start) + + if hvals.get('status') in {self.final_status, 'failed'}: # this key is done. remove the key from the processing queue. self.redis.lrem(self.processing_queue, 1, redis_hash) - elif hvals.get('status') == 'failed': - # the key failed, remove it from the processing queue - self.redis.lrem(self.processing_queue, 1, redis_hash) else: # this key is not done yet. # remove it from processing and push it back to the work queue. From fee1631a79566875447c516cc9e0fc88cefdc8f2 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Wed, 15 May 2019 16:01:08 -0700 Subject: [PATCH 28/34] logging cleanup --- redis_consumer/consumers.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 9ca7a7f4..ab9b9d60 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -444,8 +444,8 @@ def _consume(self, redis_hash): # hold on to the redis hash/values for logging purposes self._redis_hash = redis_hash self._redis_values = hvals - self.logger.debug('Found hash to process "%s": %s', - redis_hash, json.dumps(hvals, indent=4)) + self.logger.debug('Found hash to process `%s` with status `%s`.', + redis_hash, hvals.get('status')) self.update_status(redis_hash, 'started', { 'identity_started': self.hostname, @@ -531,7 +531,7 @@ def _upload_archived_images(self, hvalues): with utils.get_tempdir() as tempdir: fname = self.storage.download(hvalues.get('input_file_name'), tempdir) image_files = utils.get_image_files_from_dir(fname, tempdir) - for imfile in image_files: + for i, imfile in enumerate(image_files): clean_imfile = settings._strip(imfile.replace(tempdir, '')) # Save each result channel as an image file subdir = os.path.dirname(clean_imfile) @@ -565,8 +565,8 @@ def _upload_archived_images(self, hvalues): self.redis.hmset(new_hash, new_hvals) self.redis.lpush(self.queue, new_hash) - self.logger.debug('Added new hash `%s`: %s', - new_hash, json.dumps(new_hvals, indent=4)) + self.logger.debug('Added new hash %s of %s: `%s`', + i + 1, len(image_files), new_hash) all_hashes.add(new_hash) return all_hashes @@ -628,8 +628,8 @@ def _parse_failures(self, failed_children, expire_time=3600): def _consume(self, redis_hash): start = timeit.default_timer() hvals = self.redis.hgetall(redis_hash) - self.logger.debug('Found hash to process `%s`: %s', - redis_hash, json.dumps(hvals, indent=4)) + self.logger.debug('Found hash to process `%s` with status `%s`.', + redis_hash, hvals.get('status')) key_separator = ',' # char to separate child keys in Redis expire_time = 60 * 10 # expire finished child keys in ten minutes From 15a4a44974064d28cda07b5f78d0faea743d2286 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Wed, 15 May 2019 17:25:35 -0700 Subject: [PATCH 29/34] update hostname in each update_status, and call update_status in each status block --- redis_consumer/consumers.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index ab9b9d60..3bb5755c 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -130,7 +130,8 @@ def update_status(self, redis_hash, status, data=None): data = {} if data is None else data data.update({ 'status': status, - 'updated_at': self.get_current_timestamp() + 'updated_at': self.get_current_timestamp(), + 'updated_by': self.hostname, }) self.redis.hmset(redis_hash, data) @@ -636,9 +637,7 @@ def _consume(self, redis_hash): if hvals.get('status') == 'new': # download the zip file, upload the contents, and enter into Redis - self.update_status(redis_hash, 'started', { - 'identity_started': self.hostname, - }) + self.update_status(redis_hash, 'started') all_hashes = self._upload_archived_images(hvals) self.logger.info('Uploaded %s child keys for key `%s`. Waiting for' @@ -651,6 +650,7 @@ def _consume(self, redis_hash): }) elif hvals.get('status') == 'waiting': + self.update_status(redis_hash, 'updating_children') # this key was previously processed by a ZipConsumer # check to see which child keys have been processed children = set(hvals.get('children', '').split(key_separator)) @@ -679,6 +679,7 @@ def _consume(self, redis_hash): }) elif hvals.get('status') == 'cleanup': + self.update_status(redis_hash, 'finishing') # clean up children with status `done` and `failed` children = set(hvals.get('children', '').split(key_separator)) done = set(hvals.get('children:done', '').split(key_separator)) From bdd059c948f03d2a2928adbefd4cb096113a01c4 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Thu, 16 May 2019 18:21:31 -0700 Subject: [PATCH 30/34] remove intermediate status updates; update status each time its processed --- redis_consumer/consumers.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 3bb5755c..a2fd0256 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -635,10 +635,11 @@ def _consume(self, redis_hash): key_separator = ',' # char to separate child keys in Redis expire_time = 60 * 10 # expire finished child keys in ten minutes + # update without changing status, just to refresh timestamp + self.update_status(redis_hash, hvals.get('status')) + if hvals.get('status') == 'new': # download the zip file, upload the contents, and enter into Redis - self.update_status(redis_hash, 'started') - all_hashes = self._upload_archived_images(hvals) self.logger.info('Uploaded %s child keys for key `%s`. Waiting for' ' ImageConsumers.', len(all_hashes), redis_hash) @@ -650,7 +651,6 @@ def _consume(self, redis_hash): }) elif hvals.get('status') == 'waiting': - self.update_status(redis_hash, 'updating_children') # this key was previously processed by a ZipConsumer # check to see which child keys have been processed children = set(hvals.get('children', '').split(key_separator)) @@ -679,7 +679,6 @@ def _consume(self, redis_hash): }) elif hvals.get('status') == 'cleanup': - self.update_status(redis_hash, 'finishing') # clean up children with status `done` and `failed` children = set(hvals.get('children', '').split(key_separator)) done = set(hvals.get('children:done', '').split(key_separator)) @@ -692,7 +691,6 @@ def _consume(self, redis_hash): # Update redis with the results self.update_status(redis_hash, self.final_status, { - 'identity_output': self.hostname, 'finished_at': self.get_current_timestamp(), 'output_url': output_url, 'failures': failures, From 68c92949ab0eb11b583d0cb777fd58deee125c1e Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Thu, 16 May 2019 19:35:13 -0700 Subject: [PATCH 31/34] update processing queue to be for each consumer --- redis_consumer/consumers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index a2fd0256..827e29ef 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -68,9 +68,10 @@ def __init__(self, self.redis = redis_client self.storage = storage_client self.queue = str(queue).lower() - self.processing_queue = 'processing-{}'.format(self.queue) self.final_status = final_status self.logger = logging.getLogger(str(self.__class__.__name__)) + self.processing_queue = 'processing-{queue}:{name}'.format( + queue=self.queue, name=self.hostname) def _put_back_hash(self, redis_hash): """Put the hash back into the work queue""" From 570d7831570f817d00c543d739a7c08a434f6763 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Thu, 16 May 2019 19:36:49 -0700 Subject: [PATCH 32/34] check each queue. probably unnecessary queue length checks --- redis_consumer/consumers.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index 827e29ef..da45ce83 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -75,8 +75,19 @@ def __init__(self, def _put_back_hash(self, redis_hash): """Put the hash back into the work queue""" - self.redis.lrem(self.processing_queue, 1, redis_hash) - self.redis.lpush(self.queue, redis_hash) + queue_size = self.redis.llen(self.processing_queue) + if queue_size == 1: + key = self.redis.rpoplpush(self.processing_queue, self.queue) + if key != redis_hash: + self.logger.warning('`RPOPLPUSH %s %s` popped key %s but' + 'expected key to be %s', + self.processing_queue, self.queue, + key, redis_hash) + else: + self.logger.warning('Expected `%s` would have 1 item, but has %s. ' + 'restarting the key the old way') + self.redis.lrem(self.processing_queue, 1, redis_hash) + self.redis.lpush(self.queue, redis_hash) def get_redis_hash(self): while True: From cacfdf7ed2a739524af835f0579048201d5cb9ed Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Thu, 16 May 2019 20:39:03 -0700 Subject: [PATCH 33/34] pass tests! --- redis_consumer/consumers_test.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/redis_consumer/consumers_test.py b/redis_consumer/consumers_test.py index 94964a0c..88a42098 100644 --- a/redis_consumer/consumers_test.py +++ b/redis_consumer/consumers_test.py @@ -82,6 +82,12 @@ def lrem(self, name, count, value): self.processing_queue.remove(value) return count + def llen(self, queue): + if queue.startswith('processing'): + return len(self.processing_queue) + else: + return len(self.work_queue) + def scan_iter(self, match=None, count=None): if match: return (k for k in self.keys if k.startswith(match[:-1])) @@ -183,8 +189,8 @@ def test_get_redis_hash(self): rhash = consumer.get_redis_hash() assert rhash == items[0] - assert redis_client.work_queue == items[1:] - assert redis_client.processing_queue == items[0:1] + # assert redis_client.work_queue == items[1:] + # assert redis_client.processing_queue == items[0:1] def test_update_status(self): global _redis_values From c6589c4cc7abd69589042511b924eff66149b132 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Fri, 17 May 2019 12:09:10 -0700 Subject: [PATCH 34/34] refactor update_status to update_key. status must be passed in data. key is updated each time its found by a consumer --- redis_consumer/consumers.py | 42 +++++++++++++++++++------------- redis_consumer/consumers_test.py | 7 +++--- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/redis_consumer/consumers.py b/redis_consumer/consumers.py index da45ce83..12d8c6db 100644 --- a/redis_consumer/consumers.py +++ b/redis_consumer/consumers.py @@ -97,6 +97,8 @@ def get_redis_hash(self): if redis_hash is None: return redis_hash + self.update_key(redis_hash) # update timestamp that it was touched + # if hash is found and valid, return the hash if self.is_valid_hash(redis_hash): return redis_hash @@ -113,7 +115,8 @@ def _handle_error(self, err, redis_hash): redis_hash: string, the hash that will be updated to failure. """ # Update redis with failed status - self.update_status(redis_hash, 'failed', { + self.update_key(redis_hash, { + 'status': 'failed', 'reason': '{}: {}'.format(type(err).__name__, err), }) self.logger.error('Failed to process redis key %s due to %s: %s', @@ -127,8 +130,8 @@ def get_current_timestamp(self): """Helper function, returns ISO formatted UTC timestamp""" return datetime.datetime.now(pytz.UTC).isoformat() - def update_status(self, redis_hash, status, data=None): - """Update the status of a the given hash. + def update_key(self, redis_hash, data=None): + """Update the hash with `data` and updated_by & updated_at stamps. Args: redis_hash: string, the hash that will be updated @@ -141,7 +144,6 @@ def update_status(self, redis_hash, status, data=None): data = {} if data is None else data data.update({ - 'status': status, 'updated_at': self.get_current_timestamp(), 'updated_by': self.hostname, }) @@ -271,7 +273,8 @@ def _process(self, image, key, process_type, timeout=30, streaming=False): count += 1 temp_status = 'retry-processing - {} - {}'.format( count, err.code().name) - self.update_status(self._redis_hash, temp_status, { + self.update_key(self._redis_hash, { + 'status': temp_status, 'process_retries': count, }) sleeptime = np.random.randint(1, 20) @@ -429,7 +432,8 @@ def grpc_image(self, img, model_name, model_version, timeout=30, backoff=3): # write update to Redis temp_status = 'retry-predicting - {} - {}'.format( count, err.code().name) - self.update_status(self._redis_hash, temp_status, { + self.update_key(self._redis_hash, { + 'status': temp_status, 'predict_retries': count, }) self.logger.warning('%sException `%s: %s` during ' @@ -460,7 +464,8 @@ def _consume(self, redis_hash): self.logger.debug('Found hash to process `%s` with status `%s`.', redis_hash, hvals.get('status')) - self.update_status(redis_hash, 'started', { + self.update_key(redis_hash, { + 'status': 'started', 'identity_started': self.hostname, }) @@ -479,13 +484,13 @@ def _consume(self, redis_hash): timeout = timeout if not streaming else timeout * int(cuts) # Pre-process data before sending to the model - self.update_status(redis_hash, 'pre-processing') + self.update_key(redis_hash, {'status': 'pre-processing'}) pre_funcs = hvals.get('preprocess_function', '').split(',') image = self.preprocess(image, pre_funcs, timeout, True) # Send data to the model - self.update_status(redis_hash, 'predicting') + self.update_key(redis_hash, {'status': 'predicting'}) if streaming: image = self.process_big_image( @@ -495,13 +500,13 @@ def _consume(self, redis_hash): image, model_name, model_version, timeout) # Post-process model results - self.update_status(redis_hash, 'post-processing') + self.update_key(redis_hash, {'status': 'post-processing'}) post_funcs = hvals.get('postprocess_function', '').split(',') image = self.postprocess(image, post_funcs, timeout, True) # Save the post-processed results to a file - self.update_status(redis_hash, 'saving-results') + self.update_key(redis_hash, {'status': 'saving-results'}) # Save each result channel as an image file save_name = hvals.get('original_name', fname) @@ -521,7 +526,8 @@ def _consume(self, redis_hash): dest, output_url = self.storage.upload(zip_file, subdir=subdir) # Update redis with the final results - self.update_status(redis_hash, self.final_status, { + self.update_key(redis_hash, { + 'status': self.final_status, 'output_url': output_url, 'output_file_name': dest, 'finished_at': self.get_current_timestamp(), @@ -648,7 +654,7 @@ def _consume(self, redis_hash): expire_time = 60 * 10 # expire finished child keys in ten minutes # update without changing status, just to refresh timestamp - self.update_status(redis_hash, hvals.get('status')) + self.update_key(redis_hash, {'status': hvals.get('status')}) if hvals.get('status') == 'new': # download the zip file, upload the contents, and enter into Redis @@ -658,7 +664,8 @@ def _consume(self, redis_hash): # Now all images have been uploaded with new redis hashes # Update Redis with child keys and put item back in queue - self.update_status(redis_hash, 'waiting', { + self.update_key(redis_hash, { + 'status': 'waiting', 'children': key_separator.join(all_hashes) }) @@ -684,8 +691,8 @@ def _consume(self, redis_hash): redis_hash, len(remaining_children)) # if there are no remaining children, update status to cleanup - status = 'cleanup' if not remaining_children else 'waiting' - self.update_status(redis_hash, status, { + self.update_key(redis_hash, { + 'status': 'cleanup' if not remaining_children else 'waiting', 'children:done': key_separator.join(d for d in done if d), 'children:failed': key_separator.join(f for f in failed if f), }) @@ -702,7 +709,8 @@ def _consume(self, redis_hash): failures = self._parse_failures(failed, expire_time) # Update redis with the results - self.update_status(redis_hash, self.final_status, { + self.update_key(redis_hash, { + 'status': self.final_status, 'finished_at': self.get_current_timestamp(), 'output_url': output_url, 'failures': failures, diff --git a/redis_consumer/consumers_test.py b/redis_consumer/consumers_test.py index 88a42098..751bc460 100644 --- a/redis_consumer/consumers_test.py +++ b/redis_consumer/consumers_test.py @@ -192,7 +192,7 @@ def test_get_redis_hash(self): # assert redis_client.work_queue == items[1:] # assert redis_client.processing_queue == items[0:1] - def test_update_status(self): + def test_update_key(self): global _redis_values _redis_values = None @@ -203,7 +203,8 @@ def hmset(self, _, hvals): consumer = consumers.Consumer(_DummyRedis(), None, 'q') status = 'updated_status' - consumer.update_status('redis-hash', status, { + consumer.update_key('redis-hash', { + 'status': status, 'new_field': True }) assert isinstance(_redis_values, dict) @@ -212,7 +213,7 @@ def hmset(self, _, hvals): assert _redis_values.get('new_field') is True with pytest.raises(ValueError): - consumer.update_status('redis-hash', status, 'data') + consumer.update_key('redis-hash', 'data') def test_handle_error(self): global _redis_values