Skip to content

Commit

Permalink
Clean up rele.config.setup + Worker() init (#132)
Browse files Browse the repository at this point in the history
This relates to #114 & #119

This makes makes all config variables nullable falling back to standard
google envars, without breaking the current api.
  • Loading branch information
Craig Mulligan authored and tobami committed Dec 20, 2019
1 parent ea7de85 commit 64ea75a
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 15 deletions.
26 changes: 21 additions & 5 deletions rele/client.py
Expand Up @@ -6,13 +6,23 @@

from google.api_core import exceptions
from google.cloud import pubsub_v1
import google.auth

from rele.middleware import run_middleware_hook

logger = logging.getLogger(__name__)

USE_EMULATOR = True if os.environ.get("PUBSUB_EMULATOR_HOST") else False
DEFAULT_ENCODER_PATH = "json.JSONEncoder"
DEFAULT_ACK_DEADLINE = 60


def get_google_defaults():
try:
credentials, project = google.auth.default()
return credentials, project
except google.auth.exceptions.DefaultCredentialsError:
return None, None


class Subscriber:
Expand All @@ -26,13 +36,19 @@ class Subscriber:
:param default_ack_deadline: int Ack Deadline defined in settings
"""

def __init__(self, gc_project_id, credentials, default_ack_deadline):
self._gc_project_id = gc_project_id
self._ack_deadline = default_ack_deadline
def __init__(self, gc_project_id=None, credentials=None, default_ack_deadline=None):

if gc_project_id is None or credentials is None:
creds, project = get_google_defaults()

self._gc_project_id = gc_project_id or project
self._ack_deadline = default_ack_deadline or DEFAULT_ACK_DEADLINE
_credentials = credentials or creds

if USE_EMULATOR:
self._client = pubsub_v1.SubscriberClient()
else:
self._client = pubsub_v1.SubscriberClient(credentials=credentials)
self._client = pubsub_v1.SubscriberClient(credentials=_credentials)

def create_subscription(self, subscription, topic):
"""Handles creating the subscription when it does not exists.
Expand All @@ -59,7 +75,7 @@ def create_subscription(self, subscription, topic):

def consume(self, subscription_name, callback, scheduler):
"""Begin listening to topic from the SubscriberClient.
:param subscription_name: str Subscription name
:param callback: Function which act on a topic message
:param scheduler: `Thread pool-based scheduler.<https://googleapis.dev/python/pubsub/latest/subscriber/api/scheduler.html?highlight=threadscheduler#google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler>`_ # noqa
Expand Down
19 changes: 14 additions & 5 deletions rele/config.py
@@ -1,7 +1,7 @@
import importlib
import os

from .client import DEFAULT_ENCODER_PATH
from .client import DEFAULT_ENCODER_PATH, get_google_defaults, DEFAULT_ACK_DEADLINE
from .middleware import register_middleware, default_middleware
from .publishing import init_global_publisher
from .subscription import Subscription
Expand All @@ -19,13 +19,19 @@ class Config:
"""

def __init__(self, setting):
self.gc_project_id = setting.get("GC_PROJECT_ID")
self.credentials = setting.get("GC_CREDENTIALS")
if (
setting.get("GC_PROJECT_ID") is None
or setting.get("GC_CREDENTIALS") is None
):
credentials, project = get_google_defaults()

self.gc_project_id = setting.get("GC_PROJECT_ID") or project
self.credentials = setting.get("GC_CREDENTIALS") or credentials
self.app_name = setting.get("APP_NAME")
self.sub_prefix = setting.get("SUB_PREFIX")
self.middleware = setting.get("MIDDLEWARE", default_middleware)
self.ack_deadline = setting.get(
"ACK_DEADLINE", os.environ.get("DEFAULT_ACK_DEADLINE", 60)
"ACK_DEADLINE", os.environ.get("DEFAULT_ACK_DEADLINE", DEFAULT_ACK_DEADLINE)
)
self._encoder_path = setting.get("ENCODER_PATH", DEFAULT_ENCODER_PATH)
self.publisher_timeout = setting.get("PUBLISHER_TIMEOUT", 3.0)
Expand All @@ -38,7 +44,10 @@ def encoder(self):
return getattr(module, class_name)


def setup(setting, **kwargs):
def setup(setting=None, **kwargs):
if setting is None:
setting = {}

config = Config(setting)
init_global_publisher(config)
register_middleware(config, **kwargs)
Expand Down
8 changes: 4 additions & 4 deletions rele/worker.py
Expand Up @@ -22,10 +22,10 @@ class Worker:
def __init__(
self,
subscriptions,
gc_project_id,
credentials,
default_ack_deadline,
threads_per_subscription,
gc_project_id=None,
credentials=None,
default_ack_deadline=None,
threads_per_subscription=None,
):
self._subscriber = Subscriber(gc_project_id, credentials, default_ack_deadline)
self._futures = []
Expand Down
25 changes: 24 additions & 1 deletion tests/test_config.py
@@ -1,7 +1,8 @@
import json
import os

import pytest

from unittest.mock import patch
from rele.config import load_subscriptions_from_paths, Config
from rele import sub

Expand Down Expand Up @@ -51,6 +52,7 @@ def test_parses_all_keys(self, project_id, credentials, custom_encoder):
assert config.credentials == credentials
assert config.middleware == ["rele.contrib.DjangoDBMiddleware"]

@patch.dict(os.environ, {"GOOGLE_APPLICATION_CREDENTIALS": ""})
def test_sets_defaults(self):
settings = {}

Expand All @@ -62,3 +64,24 @@ def test_sets_defaults(self):
assert config.credentials is None
assert config.middleware == ["rele.contrib.LoggingMiddleware"]
assert config.encoder == json.JSONEncoder

@patch.dict(
os.environ,
{
"GOOGLE_APPLICATION_CREDENTIALS": os.path.dirname(
os.path.realpath(__file__)
)
+ "/dummy-pub-sub-credentials.json"
},
)
def test_sets_defaults_pulled_from_env(self, monkeypatch, project_id, credentials):
settings = {}

config = Config(settings)

assert config.app_name is None
assert config.sub_prefix is None
assert config.gc_project_id == "rele"
assert config.credentials is not None
assert config.middleware == ["rele.contrib.LoggingMiddleware"]
assert config.encoder == json.JSONEncoder
19 changes: 19 additions & 0 deletions tests/test_worker.py
@@ -1,5 +1,6 @@
from concurrent import futures
from unittest.mock import ANY, patch
import os

import pytest

Expand Down Expand Up @@ -108,3 +109,21 @@ def test_creates_subscription_with_custom_ack_deadline_from_environment(
worker.setup()

assert worker._subscriber._ack_deadline == custom_ack_deadline

@patch.dict(
os.environ,
{
"GOOGLE_APPLICATION_CREDENTIALS": os.path.dirname(
os.path.realpath(__file__)
)
+ "/dummy-pub-sub-credentials.json"
},
)
@pytest.mark.usefixtures("mock_create_subscription")
def test_creates_without_config(self):
subscriptions = (sub_stub,)
worker = Worker(subscriptions)
worker.setup()

assert worker._subscriber._ack_deadline == 60
assert worker._subscriber._gc_project_id == "rele"

0 comments on commit 64ea75a

Please sign in to comment.