Skip to content

Commit

Permalink
Update ZipFileConsumer to expire hashes and track failures (#37)
Browse files Browse the repository at this point in the history
* expire finished and failed hashes

* track failures and failure reasons, URLencoding them as 'failures' in redis

* update default names in settings

* add expire to dummy redis
  • Loading branch information
willgraf committed May 8, 2019
1 parent 541a814 commit 80b6aa3
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 13 deletions.
28 changes: 17 additions & 11 deletions redis_consumer/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
from __future__ import division
from __future__ import print_function

import uuid
import timeit
import datetime
import os
import json
import time
import uuid
import urllib
import timeit
import datetime
import logging
import zipfile

Expand Down Expand Up @@ -569,8 +570,11 @@ def _consume(self, redis_hash):

with utils.get_tempdir() as tempdir:
finished_hashes = set()
failed_hashes = set()
failed_hashes = dict()
saved_files = set()

expire_time = 60 * 10 # ten minutes

# ping redis until all the sets are finished
while all_hashes.symmetric_difference(finished_hashes):
for h in all_hashes:
Expand All @@ -584,8 +588,9 @@ def _consume(self, redis_hash):
# one of the hashes failed to process
self.logger.error('Failed to process hash `%s`: %s',
h, reason)
failed_hashes.add(h)
failed_hashes[h] = reason
finished_hashes.add(h)
self.redis.expire(h, expire_time)

elif status == self.final_status:
# one of our hashes is done!
Expand All @@ -601,28 +606,29 @@ def _consume(self, redis_hash):
for imfile in image_files:
saved_files.add(imfile)
finished_hashes.add(h)
self.redis.expire(h, expire_time)

if failed_hashes:
self.logger.warning('Failed to process %s hashes.',
len(failed_hashes))
self.logger.warning('Failed to process hashes: %s',
json.dumps(failed_hashes, indent=4))

saved_files = list(saved_files)
self.logger.info(saved_files)
zip_file = utils.zip_files(saved_files, tempdir)

# Upload the zip file to cloud storage bucket
uploaded_file_path, output_url = self.storage.upload(zip_file)
self.logger.debug('Uploaded output to: `%s`', output_url)

# check python2 vs python3
url = urllib.parse.urlencode if hasattr(urllib, 'parse') else urllib.urlencode

# Update redis with the results
self.update_status(redis_hash, self.final_status, {
'identity_output': self.hostname,
'finished_at': self.get_current_timestamp(),
'output_url': output_url,
'failures': url(failed_hashes),
'output_file_name': uploaded_file_path
})
self.logger.info('Processed all %s images of zipfile `%s` in %s',
len(all_hashes), hvals['input_file_name'],
timeit.default_timer() - start)

# TODO: expire `finished_hashes`?
3 changes: 3 additions & 0 deletions redis_consumer/consumers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ def expected_keys(self, suffix=None):
def hmset(self, rhash, hvals): # pylint: disable=W0613
return hvals

def expire(self, name, time): # pylint: disable=W0613
return 1

def hget(self, rhash, field):
if field == 'status':
return rhash.split('_')[1]
Expand Down
4 changes: 2 additions & 2 deletions redis_consumer/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@
REDIS_PORT = config('REDIS_PORT', default=6379, cast=int)

# tensorflow-serving client connection
TF_HOST = config('TF_HOST', default='tf-serving-service')
TF_HOST = config('TF_HOST', default='tf-serving')
TF_PORT = config('TF_PORT', default=8500, cast=int)
TF_TENSOR_NAME = config('TF_TENSOR_NAME', default='image')
TF_TENSOR_DTYPE = config('TF_TENSOR_DTYPE', default='DT_FLOAT')

# data-processing client connection
DP_HOST = config('DP_HOST', default='data-processing-service')
DP_HOST = config('DP_HOST', default='data-processing')
DP_PORT = config('DP_PORT', default=8080, cast=int)

# gRPC API timeout in seconds (scales with `cuts`)
Expand Down

0 comments on commit 80b6aa3

Please sign in to comment.