Reads events in redis, downloads image data from the cloud, and send the data to TensorFlow Serving via gRPC. The prediction is post-processed, zipped up, and uploaded to the cloud.
This repository is part of the DeepCell Kiosk. More information about the Kiosk project is available through Read the Docs and our FAQ page.
Custom consumers can be used to implement custom model pipelines. This documentation is a continuation of a tutorial on building a custom job pipeline.
Consumers consume redis events. Each type of redis event is put into a separate queue (e.g. predict
, track
), and each consumer type will pop items to consume off that queue.
Each redis event should have the following fields:
model_name
- The name of the model that will be retrieved by TensorFlow Serving fromgs://<bucket-name>/models
model_version
- The version number of the model in TensorFlow Servinginput_file_name
- The path to the data file in a cloud bucket.
If the consumer will send data to a TensorFlow Serving model, it should inherit from redis_consumer.consumers.TensorFlowServingConsumer
(docs), which has methods _get_predict_client()
and grpc_image()
which can send data to the specific model. The new consumer must also implement the _consume()
method which performs the bulk of the work. The _consume()
method will fetch data from redis, download data file from the bucket, process the data with a model, and upload the results to the bucket again. See below for a basic implementation of _consume()
:
def _consume(self, redis_hash):
# get all redis data for the given hash
hvals = self.redis.hgetall(redis_hash)
with utils.get_tempdir() as tempdir:
# download the image file
fname = self.storage.download(hvals.get('input_file_name'), tempdir)
# load image file as data
image = utils.get_image(fname)
# preprocess data if necessary
# send the data to the model
results = self.grpc_image(image,
hvals.get('model_name'),
hvals.get('model_version'))
# postprocess results if necessary
# save the results as an image
outpaths = utils.save_numpy_array(results, name=name,
subdir=subdir, output_dir=tempdir)
# zip up the file
zip_file = utils.zip_files(outpaths, tempdir)
# upload the zip file to the cloud bucket
dest, output_url = self.storage.upload(zip_file)
# save the results to the redis hash
self.update_key(redis_hash, {
'status': self.final_status,
'output_url': output_url,
'output_file_name': dest
})
# return the final status
return self.final_status
Finally, the new consumer needs to be registered in the script consume-redis-events.py by modifying the function get_consumer()
shown below. Add a new if statement for the new queue type (queue_name
) and the corresponding consumer.
def get_consumer(consumer_type, **kwargs):
logging.debug('Getting `%s` consumer with args %s.', consumer_type, kwargs)
ct = str(consumer_type).lower()
if ct == 'image':
return redis_consumer.consumers.ImageFileConsumer(**kwargs)
if ct == 'zip':
return redis_consumer.consumers.ZipFileConsumer(**kwargs)
if ct == 'tracking':
return redis_consumer.consumers.TrackingConsumer(**kwargs)
raise ValueError('Invalid `consumer_type`: "{}"'.format(consumer_type))
For guidance on how to complete the deployment of a custom consumer, please return to Tutorial: Custom Job.
The consumer is configured using environment variables. Please find a table of all environment variables and their descriptions below.
Name | Description | Default Value |
---|---|---|
QUEUE |
REQUIRED: The Redis job queue to check for items to consume. | "predict" |
CONSUMER_TYPE |
REQUIRED: The type of consumer to run, used in consume-redis-events.py . |
"image" |
CLOUD_PROVIDER |
REQUIRED: The cloud provider, one of "aws" and "gke" . |
"gke" |
GCLOUD_STORAGE_BUCKET |
REQUIRED: The name of the storage bucket used to download and upload files. | "default-bucket" |
INTERVAL |
How frequently the consumer checks the Redis queue for items, in seconds. | 5 |
REDIS_HOST |
The IP address or hostname of Redis. | "redis-master" |
REDIS_PORT |
The port used to connect to Redis. | 6379 |
REDIS_TIMEOUT |
Timeout for each Redis request, in seconds. | 3 |
EMPTY_QUEUE_TIMEOUT |
Time to wait after finding an empty queue, in seconds. | 5 |
EXPIRE_TIME |
Expire Redis items this many seconds after completion. | 3600 |
METADATA_EXPIRE_TIME |
Expire cached model metadata after this many seconds. | 30 |
TF_HOST |
The IP address or hostname of TensorFlow Serving. | "tf-serving" |
TF_PORT |
The port used to connect to TensorFlow Serving. | 8500 |
TF_TENSOR_NAME |
Name of input tensor for the exported model. | "image" |
GRPC_TIMEOUT |
Timeout for gRPC API requests, in seconds. | 30 |
GRPC_BACKOFF |
Time to wait before retrying a gRPC API request. | 3 |
MAX_RETRY |
Maximum number of retries for a failed TensorFlow Serving request. | 5 |
We welcome contributions to the kiosk-console and its associated projects. If you are interested, please refer to our Developer Documentation, Code of Conduct and Contributing Guidelines.
This software is license under a modified Apache-2.0 license. See LICENSE for full details.
Copyright © 2018-2020 The Van Valen Lab at the California Institute of Technology (Caltech), with support from the Paul Allen Family Foundation, Google, & National Institutes of Health (NIH) under Grant U24CA224309-01. All rights reserved.