Skip to content

Commit

Permalink
refactor update_status to update_key. status must be passed in data.
Browse files Browse the repository at this point in the history
key is updated each time its found by a consumer
  • Loading branch information
willgraf committed May 17, 2019
1 parent cacfdf7 commit c6589c4
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 20 deletions.
42 changes: 25 additions & 17 deletions redis_consumer/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ def get_redis_hash(self):
if redis_hash is None:
return redis_hash

self.update_key(redis_hash) # update timestamp that it was touched

# if hash is found and valid, return the hash
if self.is_valid_hash(redis_hash):
return redis_hash
Expand All @@ -113,7 +115,8 @@ def _handle_error(self, err, redis_hash):
redis_hash: string, the hash that will be updated to failure.
"""
# Update redis with failed status
self.update_status(redis_hash, 'failed', {
self.update_key(redis_hash, {
'status': 'failed',
'reason': '{}: {}'.format(type(err).__name__, err),
})
self.logger.error('Failed to process redis key %s due to %s: %s',
Expand All @@ -127,8 +130,8 @@ def get_current_timestamp(self):
"""Helper function, returns ISO formatted UTC timestamp"""
return datetime.datetime.now(pytz.UTC).isoformat()

def update_status(self, redis_hash, status, data=None):
"""Update the status of a the given hash.
def update_key(self, redis_hash, data=None):
"""Update the hash with `data` and updated_by & updated_at stamps.
Args:
redis_hash: string, the hash that will be updated
Expand All @@ -141,7 +144,6 @@ def update_status(self, redis_hash, status, data=None):

data = {} if data is None else data
data.update({
'status': status,
'updated_at': self.get_current_timestamp(),
'updated_by': self.hostname,
})
Expand Down Expand Up @@ -271,7 +273,8 @@ def _process(self, image, key, process_type, timeout=30, streaming=False):
count += 1
temp_status = 'retry-processing - {} - {}'.format(
count, err.code().name)
self.update_status(self._redis_hash, temp_status, {
self.update_key(self._redis_hash, {
'status': temp_status,
'process_retries': count,
})
sleeptime = np.random.randint(1, 20)
Expand Down Expand Up @@ -429,7 +432,8 @@ def grpc_image(self, img, model_name, model_version, timeout=30, backoff=3):
# write update to Redis
temp_status = 'retry-predicting - {} - {}'.format(
count, err.code().name)
self.update_status(self._redis_hash, temp_status, {
self.update_key(self._redis_hash, {
'status': temp_status,
'predict_retries': count,
})
self.logger.warning('%sException `%s: %s` during '
Expand Down Expand Up @@ -460,7 +464,8 @@ def _consume(self, redis_hash):
self.logger.debug('Found hash to process `%s` with status `%s`.',
redis_hash, hvals.get('status'))

self.update_status(redis_hash, 'started', {
self.update_key(redis_hash, {
'status': 'started',
'identity_started': self.hostname,
})

Expand All @@ -479,13 +484,13 @@ def _consume(self, redis_hash):
timeout = timeout if not streaming else timeout * int(cuts)

# Pre-process data before sending to the model
self.update_status(redis_hash, 'pre-processing')
self.update_key(redis_hash, {'status': 'pre-processing'})

pre_funcs = hvals.get('preprocess_function', '').split(',')
image = self.preprocess(image, pre_funcs, timeout, True)

# Send data to the model
self.update_status(redis_hash, 'predicting')
self.update_key(redis_hash, {'status': 'predicting'})

if streaming:
image = self.process_big_image(
Expand All @@ -495,13 +500,13 @@ def _consume(self, redis_hash):
image, model_name, model_version, timeout)

# Post-process model results
self.update_status(redis_hash, 'post-processing')
self.update_key(redis_hash, {'status': 'post-processing'})

post_funcs = hvals.get('postprocess_function', '').split(',')
image = self.postprocess(image, post_funcs, timeout, True)

# Save the post-processed results to a file
self.update_status(redis_hash, 'saving-results')
self.update_key(redis_hash, {'status': 'saving-results'})

# Save each result channel as an image file
save_name = hvals.get('original_name', fname)
Expand All @@ -521,7 +526,8 @@ def _consume(self, redis_hash):
dest, output_url = self.storage.upload(zip_file, subdir=subdir)

# Update redis with the final results
self.update_status(redis_hash, self.final_status, {
self.update_key(redis_hash, {
'status': self.final_status,
'output_url': output_url,
'output_file_name': dest,
'finished_at': self.get_current_timestamp(),
Expand Down Expand Up @@ -648,7 +654,7 @@ def _consume(self, redis_hash):
expire_time = 60 * 10 # expire finished child keys in ten minutes

# update without changing status, just to refresh timestamp
self.update_status(redis_hash, hvals.get('status'))
self.update_key(redis_hash, {'status': hvals.get('status')})

if hvals.get('status') == 'new':
# download the zip file, upload the contents, and enter into Redis
Expand All @@ -658,7 +664,8 @@ def _consume(self, redis_hash):

# Now all images have been uploaded with new redis hashes
# Update Redis with child keys and put item back in queue
self.update_status(redis_hash, 'waiting', {
self.update_key(redis_hash, {
'status': 'waiting',
'children': key_separator.join(all_hashes)
})

Expand All @@ -684,8 +691,8 @@ def _consume(self, redis_hash):
redis_hash, len(remaining_children))

# if there are no remaining children, update status to cleanup
status = 'cleanup' if not remaining_children else 'waiting'
self.update_status(redis_hash, status, {
self.update_key(redis_hash, {
'status': 'cleanup' if not remaining_children else 'waiting',
'children:done': key_separator.join(d for d in done if d),
'children:failed': key_separator.join(f for f in failed if f),
})
Expand All @@ -702,7 +709,8 @@ def _consume(self, redis_hash):
failures = self._parse_failures(failed, expire_time)

# Update redis with the results
self.update_status(redis_hash, self.final_status, {
self.update_key(redis_hash, {
'status': self.final_status,
'finished_at': self.get_current_timestamp(),
'output_url': output_url,
'failures': failures,
Expand Down
7 changes: 4 additions & 3 deletions redis_consumer/consumers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def test_get_redis_hash(self):
# assert redis_client.work_queue == items[1:]
# assert redis_client.processing_queue == items[0:1]

def test_update_status(self):
def test_update_key(self):
global _redis_values
_redis_values = None

Expand All @@ -203,7 +203,8 @@ def hmset(self, _, hvals):

consumer = consumers.Consumer(_DummyRedis(), None, 'q')
status = 'updated_status'
consumer.update_status('redis-hash', status, {
consumer.update_key('redis-hash', {
'status': status,
'new_field': True
})
assert isinstance(_redis_values, dict)
Expand All @@ -212,7 +213,7 @@ def hmset(self, _, hvals):
assert _redis_values.get('new_field') is True

with pytest.raises(ValueError):
consumer.update_status('redis-hash', status, 'data')
consumer.update_key('redis-hash', 'data')

def test_handle_error(self):
global _redis_values
Expand Down

0 comments on commit c6589c4

Please sign in to comment.