Skip to content

Commit

Permalink
Merge branch 'master' into panopticnet
Browse files Browse the repository at this point in the history
  • Loading branch information
willgraf committed Feb 28, 2020
2 parents 6aca80d + 4992765 commit c7c407a
Show file tree
Hide file tree
Showing 17 changed files with 471 additions and 40 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ output/
download/
logs/

# Doc files
docs/source/_sidebar.rst.inc
docs/source/gensidebar.py

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
8 changes: 5 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ python:
cache: pip

install:
- pip install -r requirements.txt
- pip install pytest pytest-cov==2.5.1 pytest-pep8 coveralls
- travis_retry pip install -r requirements.txt
- travis_retry pip install pytest pytest-cov==2.5.1 pytest-pep8 coveralls
- travis_retry pip install -r docs/rtd-requirements.txt

script:
- pytest --cov=redis_consumer --pep8
- PYTHONPATH=$PWD:$PYTHONPATH pytest --cov=redis_consumer --pep8
- PYTHONPATH=$PWD:$PYTHONPATH sphinx-build -nT -b dummy ./docs/source build/html

jobs:
include:
Expand Down
129 changes: 94 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,118 @@

[![Build Status](https://travis-ci.org/vanvalenlab/kiosk-redis-consumer.svg?branch=master)](https://travis-ci.org/vanvalenlab/kiosk-redis-consumer)
[![Coverage Status](https://coveralls.io/repos/github/vanvalenlab/kiosk-redis-consumer/badge.svg?branch=master)](https://coveralls.io/github/vanvalenlab/kiosk-redis-consumer?branch=master)
[![Documentation Status](https://readthedocs.org/projects/kiosk-redis-consumer/badge/?version=master)](https://deepcell-kiosk.readthedocs.io/projects/kiosk-redis-consumer/en/latest/?badge=master)

Reads events in redis, downloads image data from the cloud, and send the data to TensorFlow Serving via gRPC. The prediction is post-processed, zipped up, and uploaded to the cloud.
Reads events in redis, downloads image data from the cloud, and send the data to TensorFlow Serving via gRPC. The prediction is post-processed, zipped up, and uploaded to the cloud.

## Extending Consumers
This repository is part of the [DeepCell Kiosk](https://github.com/vanvalenlab/kiosk). More information about the Kiosk project is available through [Read the Docs](https://deepcell-kiosk.readthedocs.io/en/master) and our [FAQ](http://www.deepcell.org.faq) page.

## Custom Consumers

Custom consumers can be used to implement custom model pipelines. This documentation is a continuation of a [tutorial](https://deepcell-kiosk.readthedocs.io/en/master/CUSTOM-JOB.html) on building a custom job pipeline.

Consumers consume redis events. Each type of redis event is put into a separate queue (e.g. `predict`, `track`), and each consumer type will pop items to consume off that queue.

Each redis event should have the following fields:

* `model_name` - The name of the model in TensorFlow Serving
* `model_version` - The version number of the model in TensorFlow Serving
* `input_file_name` - The path to the data file in a cloud bucket.
- `model_name` - The name of the model that will be retrieved by TensorFlow Serving from `gs://<bucket-name>/models`
- `model_version` - The version number of the model in TensorFlow Serving
- `input_file_name` - The path to the data file in a cloud bucket.

If the consumer will send data to a TensorFlow Serving model, it should inherit from `TensorFlowServingConsumer`, which has methods `_get_predict_client` and `grpc_image` which can send data to the specific model. The new consumer must also implement the `_consume` method which performs the bulk of the work. The `_consume` method will fetch data from redis, download data file from the bucket, process the data with a model, and upload the results to the bucket again. See below for a basic implementation of `_consume`:
If the consumer will send data to a TensorFlow Serving model, it should inherit from `redis_consumer.consumers.TensorFlowServingConsumer` ([docs](https://deepcell-kiosk.readthedocs.io/projects/kiosk-redis-consumer/en/master/redis_consumer.consumers.html)), which has methods `_get_predict_client()` and `grpc_image()` which can send data to the specific model. The new consumer must also implement the `_consume()` method which performs the bulk of the work. The `_consume()` method will fetch data from redis, download data file from the bucket, process the data with a model, and upload the results to the bucket again. See below for a basic implementation of `_consume()`:

```python
def _consume(self, redis_hash):
# get all redis data for the given hash
hvals = self.redis.hgetall(redis_hash)
def _consume(self, redis_hash):
# get all redis data for the given hash
hvals = self.redis.hgetall(redis_hash)

with utils.get_tempdir() as tempdir:
# download the image file
fname = self.storage.download(hvals.get('input_file_name'), tempdir)

# load image file as data
image = utils.get_image(fname)

with utils.get_tempdir() as tempdir:
# download the image file
fname = self.storage.download(hvals.get('input_file_name'), tempdir)
# preprocess data if necessary

# load image file as data
image = utils.get_image(fname)
# send the data to the model
results = self.grpc_image(image,
hvals.get('model_name'),
hvals.get('model_version'))

# preprocess data if necessary
# postprocess results if necessary

# send the data to the model
results = self.grpc_image(image,
hvals.get('model_name'),
hvals.get('model_version'))
# save the results as an image
outpaths = utils.save_numpy_array(results, name=name,
subdir=subdir, output_dir=tempdir)

# postprocess results if necessary
# zip up the file
zip_file = utils.zip_files(outpaths, tempdir)

# save the results as an image
outpaths = utils.save_numpy_array(results, name=name,
subdir=subdir, output_dir=tempdir)
# upload the zip file to the cloud bucket
dest, output_url = self.storage.upload(zip_file)

# zip up the file
zip_file = utils.zip_files(outpaths, tempdir)
# save the results to the redis hash
self.update_key(redis_hash, {
'status': self.final_status,
'output_url': output_url,
'output_file_name': dest
})

# upload the zip file to the cloud bucket
dest, output_url = self.storage.upload(zip_file)
# return the final status
return self.final_status
```

# save the results to the redis hash
self.update_key(redis_hash, {
'status': self.final_status,
'output_url': output_url,
'output_file_name': dest
})
Finally, the new consumer needs to be registered in the script <tt><a href="https://github.com/vanvalenlab/kiosk-redis-consumer/blob/master/consume-redis-events.py">consume-redis-events.py</a></tt> by modifying the function `get_consumer()` shown below. Add a new if statement for the new queue type (`queue_name`) and the corresponding consumer.

# return the final status
return self.final_status
```python
def get_consumer(consumer_type, **kwargs):
logging.debug('Getting `%s` consumer with args %s.', consumer_type, kwargs)
ct = str(consumer_type).lower()
if ct == 'image':
return redis_consumer.consumers.ImageFileConsumer(**kwargs)
if ct == 'zip':
return redis_consumer.consumers.ZipFileConsumer(**kwargs)
if ct == 'tracking':
return redis_consumer.consumers.TrackingConsumer(**kwargs)
raise ValueError('Invalid `consumer_type`: "{}"'.format(consumer_type))
```

For guidance on how to complete the deployment of a custom consumer, please return to [Tutorial: Custom Job](https://deepcell-kiosk.readthedocs.io/en/master/CUSTOM-JOB.html).

## Configuration

The consumer is configured using environment variables. Please find a table of all environment variables and their descriptions below.

| Name | Description | Default Value |
| :--- | :--- | :--- |
| `QUEUE` | **REQUIRED**: The Redis job queue to check for items to consume. | `"predict"` |
| `CONSUMER_TYPE` | **REQUIRED**: The type of consumer to run, used in `consume-redis-events.py`. | `"image"` |
| `CLOUD_PROVIDER` | **REQUIRED**: The cloud provider, one of `"aws"` and `"gke"`. | `"gke"` |
| `GCLOUD_STORAGE_BUCKET` | **REQUIRED**: The name of the storage bucket used to download and upload files. | `"default-bucket"` |
| `INTERVAL` | How frequently the consumer checks the Redis queue for items, in seconds. | `5` |
| `REDIS_HOST` | The IP address or hostname of Redis. | `"redis-master"` |
| `REDIS_PORT` | The port used to connect to Redis. | `6379` |
| `REDIS_TIMEOUT` | Timeout for each Redis request, in seconds. | `3` |
| `EMPTY_QUEUE_TIMEOUT` | Time to wait after finding an empty queue, in seconds. | `5` |
| `EXPIRE_TIME` | Expire Redis items this many seconds after completion. | `3600` |
| `TF_HOST` | The IP address or hostname of TensorFlow Serving. | `"tf-serving"` |
| `TF_PORT` | The port used to connect to TensorFlow Serving. | `8500` |
| `TF_TENSOR_NAME` | Name of input tensor for the exported model. | `"image"` |
| `TF_TENSOR_DTYPE` | The `dtype` used for the exported model. | `"DT_FLOAT"` |
| `GRPC_TIMEOUT` | Timeout for gRPC API requests, in seconds. | `30` |
| `GRPC_BACKOFF` | Time to wait before retrying a gRPC API request. | `3` |
| `MAX_RETRY` | Maximum number of retries for a failed TensorFlow Serving request. | `5` |

## Contribute

We welcome contributions to the [kiosk](https://github.com/vanvalenlab/kiosk) and its associated projects. If you are interested, please refer to our [Developer Documentation](https://deepcell-kiosk.readthedocs.io/en/master/DEVELOPER.html), [Code of Conduct](https://github.com/vanvalenlab/kiosk/blob/master/CODE_OF_CONDUCT.md) and [Contributing Guidelines](https://github.com/vanvalenlab/kiosk/blob/master/CONTRIBUTING.md).

## License

This software is license under a modified [APACHE2](https://opensource.org/licenses/Apache-2.0). See [LICENSE](/LICENSE) for full details.

## Copyright

Copyright © 2018-2020 [The Van Valen Lab](http://www.vanvalen.caltech.edu/) at the California Institute of Technology (Caltech), with support from the Paul Allen Family Foundation, Google, & National Institutes of Health (NIH) under Grant U24CA224309-01.
All rights reserved.
20 changes: 20 additions & 0 deletions docs/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#

# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
SPHINXPROJ = KioskRedisConsumer
SOURCEDIR = source
BUILDDIR = build

# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

.PHONY: help Makefile

# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
12 changes: 12 additions & 0 deletions docs/rtd-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
m2r
mock==3.0.5
Sphinx>=1.8.5

boto3==1.9.195
google-cloud-storage>=1.16.1
python-decouple==3.1
pathlib==1.0.1
redis==3.2.1
grpcio==1.22.0
dict-to-protobuf==0.0.3.9
pytz==2019.1
11 changes: 11 additions & 0 deletions docs/source/API.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
API
===

.. toctree::

redis_consumer.consumers
redis_consumer.grpc_clients
redis_consumer.redis
redis_consumer.storage
redis_consumer.tracking
redis_consumer.utils
1 change: 1 addition & 0 deletions docs/source/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. mdinclude:: ../../README.md

0 comments on commit c7c407a

Please sign in to comment.