Skip to content

Commit

Permalink
Merge e4fdc4a into 1525465
Browse files Browse the repository at this point in the history
  • Loading branch information
willgraf committed Apr 10, 2019
2 parents 1525465 + e4fdc4a commit ac8f8b9
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 59 deletions.
88 changes: 59 additions & 29 deletions redis_consumer/consumers.py
Expand Up @@ -119,8 +119,8 @@ def _redis_type(self, redis_key):
break
except redis.exceptions.ConnectionError as err:
self.logger.warning('Encountered %s: %s when calling '
'redis.type(). Retrying in %s seconds.',
type(err).__name__, err,
'`TYPE %s`. Retrying in %s seconds.',
type(err).__name__, err, redis_key,
self._redis_retry_timeout)
time.sleep(self._redis_retry_timeout)
return response
Expand Down Expand Up @@ -168,8 +168,8 @@ def hset(self, rhash, key, value):
break
except redis.exceptions.ConnectionError as err:
self.logger.warning('Encountered %s: %s when calling '
'HSET. Retrying in %s seconds.',
type(err).__name__, err,
'`HSET %s %s %s`. Retrying in %s seconds.',
type(err).__name__, err, rhash, key, value,
self._redis_retry_timeout)
time.sleep(self._redis_retry_timeout)
return response
Expand All @@ -184,8 +184,8 @@ def hget(self, rhash, key):
break
except redis.exceptions.ConnectionError as err:
self.logger.warning('Encountered %s: %s when calling '
'redis.hget(). Retrying in %s seconds.',
type(err).__name__, err,
'`HGET %s %s`. Retrying in %s seconds.',
type(err).__name__, err, rhash, key,
self._redis_retry_timeout)
time.sleep(self._redis_retry_timeout)
return response
Expand All @@ -200,8 +200,8 @@ def hmset(self, rhash, data):
break
except redis.exceptions.ConnectionError as err:
self.logger.warning('Encountered %s: %s when calling '
'redis.hmset(). Retrying in %s seconds.',
type(err).__name__, err,
'`HMSET %s %s`. Retrying in %s seconds.',
type(err).__name__, err, rhash, data,
self._redis_retry_timeout)
time.sleep(self._redis_retry_timeout)
return response
Expand All @@ -216,8 +216,8 @@ def hgetall(self, rhash):
break
except redis.exceptions.ConnectionError as err:
self.logger.warning('Encountered %s: %s when calling '
'redis.hgetall(). Retrying in %s seconds.',
type(err).__name__, err,
'`HGETALL %s`. Retrying in %s seconds.',
type(err).__name__, err, rhash,
self._redis_retry_timeout)
time.sleep(self._redis_retry_timeout)
return response
Expand All @@ -237,8 +237,15 @@ def consume(self, status=None, prefix=None):
try:
start = timeit.default_timer()
self._consume(redis_hash)
self.logger.debug('Consumed key %s in %s seconds.',
redis_hash, timeit.default_timer() - start)
hvals = self.hgetall(redis_hash)
self.logger.debug('Consumed key %s (model %s:%s, '
'preprocessing: %s, postprocessing: %s) '
'(%s retries) in %s seconds.',
redis_hash, hvals.get('model_name'),
hvals.get('model_version'),
hvals.get('preprocess_function'),
hvals.get('postprocess_function'),
0, timeit.default_timer() - start)
except Exception as err: # pylint: disable=broad-except
self._handle_error(err, redis_hash)

Expand Down Expand Up @@ -306,9 +313,15 @@ def _process(self, image, key, process_type, timeout=30, streaming=False):
else:
results = client.process(req_data, timeout)

self.logger.debug('Finished %s %s-processing (%s retries) in '
'%s seconds.', key, process_type, count,
timeit.default_timer() - start)
self.logger.debug('%s-processed key %s (model %s:%s, '
'preprocessing: %s, postprocessing: %s)'
' (%s retries) in %s seconds.',
process_type.capitalize(), self._redis_hash,
self._redis_values.get('model_name'),
self._redis_values.get('model_version'),
self._redis_values.get('preprocess_function'),
self._redis_values.get('postprocess_function'),
count, timeit.default_timer() - start)

results = results['results']
# Again, squeeze out batch dimension if unnecessary
Expand All @@ -322,24 +335,28 @@ def _process(self, image, key, process_type, timeout=30, streaming=False):
grpc.StatusCode.DEADLINE_EXCEEDED,
grpc.StatusCode.UNAVAILABLE
}
if err.code() in retry_statuses: # pylint: disable=E1101
# pylint: disable=E1101
if err.code() in retry_statuses:
count += 1
# write update to Redis
processing_retry_time = time.time() * 1000
self.hmset(self._redis_hash, {
'number_of_processing_retries': count,
'status': 'processing -- RETRY:{} -- {}'.format(
count, err.code().name), # pylint: disable=E1101
'status': '{} {}-processing -- RETRY:{} -- {}'.format(
key, process_type, count,
err.code().name),
'timestamp_processing_retry': processing_retry_time,
'identity_processing_retry': self.hostname,
'timestamp_last_status_update': processing_retry_time
})
self.logger.warning(err.details()) # pylint: disable=E1101
self.logger.warning('%s during %s %s-processing request: '
'%s', type(err).__name__, key,
process_type, err)
sleeptime = np.random.randint(24, 44)
sleeptime = 1 + sleeptime * int(streaming)
self.logger.warning('%sException `%s: %s` during %s '
'%s-processing request. Waiting %s '
'seconds before retrying.',
type(err).__name__, err.code().name,
err.details(), key, process_type,
sleeptime)
self.logger.debug('Waiting for %s seconds before retrying',
sleeptime)
time.sleep(sleeptime) # sleep before retry
Expand Down Expand Up @@ -468,28 +485,39 @@ def grpc_image(self, img, model_name, model_version, timeout=30, backoff=3):
prediction = client.predict(req_data, request_timeout=timeout)
retrying = False
results = prediction['prediction']
self.logger.debug('Segmented image with model %s:%s '
'(%s retries) in %s seconds.',
model_name, model_version, count,
timeit.default_timer() - start)
self.logger.debug('Segmented key %s (model %s:%s, '
'preprocessing: %s, postprocessing: %s)'
' (%s retries) in %s seconds.',
self._redis_hash, model_name, model_version,
self._redis_values.get('preprocess_function'),
self._redis_values.get('postprocess_function'),
count, timeit.default_timer() - start)
return results
except grpc.RpcError as err:
# pylint: disable=E1101
retry_statuses = {
grpc.StatusCode.DEADLINE_EXCEEDED,
grpc.StatusCode.UNAVAILABLE
}
if err.code() in retry_statuses: # pylint: disable=E1101
if err.code() in retry_statuses:
count += 1
# write update to Redis
processing_retry_time = time.time() * 1000
self.hmset(self._redis_hash, {
'number_of_processing_retries': count,
'status': 'processing -- RETRY:{} -- {}'.format(
count, err.code().name), # pylint: disable=E1101
count, err.code().name),
'timestamp_processing_retry': processing_retry_time,
'identity_processing_retry': self.hostname,
'timestamp_last_status_update': processing_retry_time
})

self.logger.warning('%sException `%s: %s` during '
'PredictClient request to model %s:%s.'
'Waiting %s seconds before retrying.',
type(err).__name__, err.code().name,
err.details(), model_name,
model_version, backoff)
self.logger.warning('Encountered %s during PredictClient '
'request to model %s:%s: %s.',
type(err).__name__, model_name,
Expand All @@ -510,8 +538,10 @@ def grpc_image(self, img, model_name, model_version, timeout=30, backoff=3):
raise err

def _consume(self, redis_hash):
self._redis_hash = redis_hash
hvals = self.hgetall(redis_hash)
# hold on to the redis hash/values for logging purposes
self._redis_hash = redis_hash
self._redis_values = hvals
self.logger.debug('Found hash to process "%s": %s',
redis_hash, json.dumps(hvals, indent=4))

Expand Down
53 changes: 27 additions & 26 deletions redis_consumer/grpc_clients.py
Expand Up @@ -97,13 +97,13 @@ def predict(self, request_data, request_timeout=10):

t = timeit.default_timer()
stub = PredictionServiceStub(channel)
self.logger.debug('Creating PredictionServiceStub took: %s',
self.logger.debug('Created TensorFlowServingServiceStub in %s seconds.',
timeit.default_timer() - t)

t = timeit.default_timer()
request = PredictRequest()
self.logger.debug('Creating PredictRequest object took: %s',
timeit.default_timer() - t)
self.logger.debug('Created TensorFlowServingRequest object in %s '
'seconds.', timeit.default_timer() - t)

request.model_spec.name = self.model_name # pylint: disable=E1101

Expand All @@ -117,22 +117,23 @@ def predict(self, request_data, request_timeout=10):
# pylint: disable=E1101
request.inputs[d['in_tensor_name']].CopyFrom(tensor_proto)

self.logger.debug('Making tensor protos took: %s',
self.logger.debug('Made tensor protos in %s seconds.',
timeit.default_timer() - t)

try:
t = timeit.default_timer()
predict_response = stub.Predict(request, timeout=request_timeout)
self.logger.debug('Actual PredictRequest took: %s seconds.',
timeit.default_timer() - t)
self.logger.debug('gRPC TensorFlowServingRequest finished in %s '
'seconds.', timeit.default_timer() - t)

t = timeit.default_timer()
predict_response_dict = grpc_response_to_dict(predict_response)
self.logger.debug('Converted PredictResponse to dict in %s seconds.',
timeit.default_timer() - t)
self.logger.debug('gRPC TensorFlowServingProtobufConversion took '
'%s seconds.', timeit.default_timer() - t)

keys = [k for k in predict_response_dict]
self.logger.info('Got PredictResponse with keys: %s ', keys)
self.logger.info('Got TensorFlowServingResponse with keys: %s ',
keys)

return predict_response_dict

Expand Down Expand Up @@ -167,12 +168,12 @@ def process(self, request_data, request_timeout=10):

t = timeit.default_timer()
stub = ProcessingServiceStub(channel)
self.logger.debug('Creating ProcessingServiceStub took %s seconds.',
timeit.default_timer() - t)
self.logger.debug('Created DataProcessingProcessingServiceStub in %s '
'seconds.', timeit.default_timer() - t)

t = timeit.default_timer()
request = ProcessRequest()
self.logger.debug('Creating ProcessRequest object took: %s',
self.logger.debug('Created DataProcessingRequest object in %s seconds.',
timeit.default_timer() - t)

# pylint: disable=E1101
Expand All @@ -186,19 +187,19 @@ def process(self, request_data, request_timeout=10):
# pylint: disable=E1101
request.inputs[d['in_tensor_name']].CopyFrom(tensor_proto)

self.logger.debug('Making tensor protos took: %s',
self.logger.debug('Made tensor protos in %s seconds.',
timeit.default_timer() - t)

try:
t = timeit.default_timer()
response = stub.Process(request, timeout=request_timeout)
self.logger.debug('Actual ProcessRequest took: %s seconds.',
timeit.default_timer() - t)
self.logger.debug('gRPC DataProcessingRequest finished in %s '
'seconds.', timeit.default_timer() - t)

t = timeit.default_timer()
response_dict = grpc_response_to_dict(response)
self.logger.debug('Converted ProcessResponse to dict in %s seconds.',
timeit.default_timer() - t)
self.logger.debug('gRPC DataProcessingProtobufConversion took %s '
'seconds.', timeit.default_timer() - t)

keys = [k for k in response_dict]
self.logger.debug('Got processing_response with keys: %s', keys)
Expand All @@ -222,7 +223,7 @@ def stream_process(self, request_data, request_timeout=10):

t = timeit.default_timer()
stub = ProcessingServiceStub(channel)
self.logger.debug('Creating stub took %s seconds.',
self.logger.debug('Created stub in %s seconds.',
timeit.default_timer() - t)
chunk_size = 64 * 1024 # 64 kB is recommended payload size

Expand Down Expand Up @@ -259,18 +260,18 @@ def request_iterator(image):
processed_bytes.append(response.outputs['data'])

npbytes = b''.join(processed_bytes)
self.logger.info('Got response stream of %s bytes in %s seconds.',
len(npbytes), timeit.default_timer() - t)
# Got response stream of %s bytes in %s seconds.
self.logger.info('gRPC DataProcessingStreamRequest of %s bytes '
'finished in %s seconds.', len(npbytes),
timeit.default_timer() - t)

t = timeit.default_timer()
processed_image = np.frombuffer(npbytes, dtype=dtype)
self.logger.info('Loaded bytes into numpy array of shape %s in %s'
' seconds.', processed_image.shape,
timeit.default_timer() - t)

results = processed_image.reshape(shape)
self.logger.info('Reshaped array into shape %s',
results.shape)
self.logger.info('gRPC DataProcessingStreamConversion from %s bytes'
' to a numpy array of shape %s in %s seconds.',
len(npbytes), results.shape,
timeit.default_timer() - t)

return {'results': results}

Expand Down
9 changes: 5 additions & 4 deletions redis_consumer/storage.py
Expand Up @@ -211,8 +211,9 @@ def download(self, filepath, download_dir=None):
start = timeit.default_timer()
blob = self._client.get_bucket(self.bucket).blob(filepath)
blob.download_to_filename(dest)
self.logger.debug('Downloaded %s in %s seconds.',
dest, timeit.default_timer() - start)
self.logger.debug('Downloaded %s from bucket %s in %s seconds.',
dest, self.bucket,
timeit.default_timer() - start)
return dest
except google_exceptions.TooManyRequests as err:
self.logger.warning('Encountered %s: %s. Backing off for %s '
Expand Down Expand Up @@ -302,8 +303,8 @@ def download(self, filepath, download_dir=None):
self.logger.debug('Downloading %s to %s.', filepath, dest)
try:
self._client.download_file(self.bucket, filepath, dest)
self.logger.debug('Downloaded %s in %s seconds.',
dest, timeit.default_timer() - start)
self.logger.debug('Downloaded %s from bucket %s in %s seconds.',
dest, self.bucket, timeit.default_timer() - start)
return dest
except Exception as err:
self.logger.error('Encountered %s: %s while downloading %s.',
Expand Down

0 comments on commit ac8f8b9

Please sign in to comment.