Skip to content

Commit

Permalink
Merge b1ba2a9 into b73c25c
Browse files Browse the repository at this point in the history
  • Loading branch information
willgraf committed Feb 8, 2021
2 parents b73c25c + b1ba2a9 commit 8e109ea
Show file tree
Hide file tree
Showing 93 changed files with 1,324 additions and 10,554 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Expand Up @@ -5,6 +5,7 @@ logs/
protos/
docs/
build/
.github/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
11 changes: 2 additions & 9 deletions .env.example
Expand Up @@ -9,17 +9,10 @@ REDIS_HOST=
TF_PORT=
TF_HOST=

# Cloud selection
CLOUD_PROVIDER=

# DEBUG Logging
DEBUG=
# Storage bucket
STORAGE_BUCKET=

# AWS Credentials
AWS_REGION=
AWS_S3_BUCKET=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

# Google variables
GKE_BUCKET=
3 changes: 2 additions & 1 deletion .github/workflows/tests.yaml
Expand Up @@ -12,7 +12,7 @@ jobs:

strategy:
matrix:
python-version: [2.7, 3.5, 3.6, 3.7, 3.8]
python-version: [3.5, 3.6, 3.7, 3.8]

steps:
- uses: actions/checkout@v2
Expand All @@ -37,6 +37,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install --no-deps -r requirements-no-deps.txt
pip install -r requirements-test.txt
- name: PyTest
Expand Down
11 changes: 8 additions & 3 deletions Dockerfile
Expand Up @@ -23,13 +23,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
FROM python:3.6
FROM python:3.6-slim-buster

WORKDIR /usr/src/app

COPY requirements.txt .
RUN apt-get update && apt-get install -y \
build-essential libglib2.0-0 && \
rm -rf /var/lib/apt/lists/*

RUN pip install --no-cache-dir -r requirements.txt
COPY requirements.txt requirements-no-deps.txt ./

RUN pip install --no-cache-dir -r requirements.txt && \
pip install --no-cache-dir --no-deps -r requirements-no-deps.txt

COPY . .

Expand Down
29 changes: 9 additions & 20 deletions README.md
Expand Up @@ -17,7 +17,6 @@ Consumers consume Redis events. Each type of Redis event is put into a queue (e.
Consumers call the `_consume` method to consume each item it finds in the queue.
This method must be implemented for every consumer.


The quickest way to get a custom consumer up and running is to:

1. Add a new file for the consumer: `redis_consumer/consumers/my_new_consumer.py`
Expand All @@ -38,28 +37,19 @@ def _consume(self, redis_hash):
redis_hash, hvals.get('status'))
return hvals.get('status')

# the data to process with the model, required.
input_file_name = hvals.get('input_file_name')
# Load input image
fname = hvals.get('input_file_name')
image = self.download_image(fname)

# the model can be passed in as an environment variable,
# and parsed in settings.py.
model_name, model_version = 'CustomModel:1'.split(':')

with utils.get_tempdir() as tempdir:
# download the image file
fname = self.storage.download(input_file_name, tempdir)
# load image file as data
image = utils.get_image(fname)

# pre- and post-processing can be used with the BaseConsumer.process,
# which uses pre-defined functions in settings.PROCESSING_FUNCTIONS.
image = self.preprocess(image, 'normalize')
model = 'NuclearSegmentation:1'

# send the data to the model
results = self.predict(image, model_name, model_version)
# Use a custom Application from deepcell.applications
app = self.get_grpc_app(model, deepcell.applications.NuclearSegmentation)

# post-process model results
image = self.postprocess(image, 'deep_watershed')
# Run the predictions on the image
results = app.predict(image)

# save the results as an image file and upload it to the bucket
save_name = hvals.get('original_name', fname)
Expand Down Expand Up @@ -90,8 +80,7 @@ The consumer is configured using environment variables. Please find a table of a
| :--- | :--- | :--- |
| `QUEUE` | **REQUIRED**: The Redis job queue to check for items to consume. | `"predict"` |
| `CONSUMER_TYPE` | **REQUIRED**: The type of consumer to run, used in `consume-redis-events.py`. | `"image"` |
| `CLOUD_PROVIDER` | **REQUIRED**: The cloud provider, one of `"aws"` and `"gke"`. | `"gke"` |
| `GCLOUD_STORAGE_BUCKET` | **REQUIRED**: The name of the storage bucket used to download and upload files. | `"default-bucket"` |
| `STORAGE_BUCKET` | **REQUIRED**: The name of the storage bucket used to download and upload files. | `"s3://default-bucket"` |
| `INTERVAL` | How frequently the consumer checks the Redis queue for items, in seconds. | `5` |
| `REDIS_HOST` | The IP address or hostname of Redis. | `"redis-master"` |
| `REDIS_PORT` | The port used to connect to Redis. | `6379` |
Expand Down
32 changes: 16 additions & 16 deletions consume-redis-events.py
Expand Up @@ -37,32 +37,33 @@
import sys
import traceback

import decouple

import redis_consumer
from redis_consumer import settings


def initialize_logger(debug_mode=True):
def initialize_logger(log_level='DEBUG'):
log_level = str(log_level).upper()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

formatter = logging.Formatter('[%(asctime)s]:[%(levelname)s]:[%(name)s]: %(message)s')
formatter = logging.Formatter('[%(levelname)s]:[%(name)s]: %(message)s')
console = logging.StreamHandler(stream=sys.stdout)
console.setFormatter(formatter)

fh = logging.handlers.RotatingFileHandler(
filename='redis-consumer.log',
maxBytes=10000000,
backupCount=10)
fh.setFormatter(formatter)

if debug_mode:
console.setLevel(logging.DEBUG)
else:
if log_level == 'CRITICAL':
console.setLevel(logging.CRITICAL)
elif log_level == 'ERROR':
console.setLevel(logging.ERROR)
elif log_level == 'WARN':
console.setLevel(logging.WARN)
elif log_level == 'INFO':
console.setLevel(logging.INFO)
fh.setLevel(logging.DEBUG)
else:
console.setLevel(logging.DEBUG)

logger.addHandler(console)
logger.addHandler(fh)


def get_consumer(consumer_type, **kwargs):
Expand All @@ -74,7 +75,7 @@ def get_consumer(consumer_type, **kwargs):


if __name__ == '__main__':
initialize_logger(settings.DEBUG)
initialize_logger(decouple.config('LOG_LEVEL', default='DEBUG'))

_logger = logging.getLogger(__file__)

Expand All @@ -83,7 +84,7 @@ def get_consumer(consumer_type, **kwargs):
port=settings.REDIS_PORT,
backoff=settings.REDIS_TIMEOUT)

storage_client = redis_consumer.storage.get_client(settings.CLOUD_PROVIDER)
storage_client = redis_consumer.storage.get_client(settings.STORAGE_BUCKET)

consumer_kwargs = {
'redis_client': redis,
Expand All @@ -92,7 +93,6 @@ def get_consumer(consumer_type, **kwargs):
'final_status': 'done',
'failed_status': 'failed',
'name': settings.HOSTNAME,
'output_dir': settings.OUTPUT_DIR,
}

_logger.debug('Getting `%s` consumer with args %s.',
Expand Down
62 changes: 0 additions & 62 deletions protos/attr_value.proto

This file was deleted.

113 changes: 0 additions & 113 deletions protos/function.proto

This file was deleted.

0 comments on commit 8e109ea

Please sign in to comment.