Skip to content

Commit

Permalink
Implemented setting the gc region to new topics (#249)
Browse files Browse the repository at this point in the history
* Add test to add region to topic creation

* Implemented setting the gc region to new topics

* Fix linting

* Fix test

* Fix linting

* Fix test

---------

Co-authored-by: jponzvan <jponzvan@mercadona.es>
  • Loading branch information
EmilioCarrion and jjponz committed May 2, 2023
1 parent b98f132 commit 826a717
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 9 deletions.
8 changes: 8 additions & 0 deletions docs/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Example::
'PUBLISHER_TIMEOUT': 3.0,
'FILTER_SUBS_BY': boolean_function,
'DEFAULT_RETRY_POLICY': RetryPolicy(10, 50),
'GC_STORAGE_REGION': 'europe-west1',
}

``GC_PROJECT_ID``
Expand Down Expand Up @@ -187,3 +188,10 @@ A RetryPolicy object which must be instantiated with `minimum_backoff` and `maxi
If not set, the default retry policy is applied, meaning a minimum backoff of 10 seconds and a maximum backoff of 60 seconds.
This generally implies that messages will be retried as soon as possible for healthy subscribers.
RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.

``GC_STORAGE_REGION``
----------------------------

**Optional**

Set the Google Cloud's region for storing the messages. By default is `europe-west1`
2 changes: 1 addition & 1 deletion rele/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import sys

from rele import config, discover, subscription
from rele import config, discover
from rele.worker import create_and_run

logger = logging.getLogger(__name__)
Expand Down
13 changes: 12 additions & 1 deletion rele/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from google.api_core import exceptions
from google.cloud import pubsub_v1
from google.protobuf import duration_pb2
from google.pubsub_v1 import MessageStoragePolicy
from google.pubsub_v1 import RetryPolicy as GCloudRetryPolicy

from rele.middleware import run_middleware_hook
Expand Down Expand Up @@ -37,6 +38,7 @@ class Subscriber:
:param gc_project_id: str :ref:`settings_project_id` .
:param credentials: obj :meth:`~rele.config.Config.credentials`.
:param message_storage_policy: str Region to store the messages
:param default_ack_deadline: int Ack Deadline defined in settings
:param default_retry_policy: RetryPolicy Rele's RetryPolicy defined in settings
"""
Expand All @@ -45,12 +47,14 @@ def __init__(
self,
gc_project_id,
credentials,
message_storage_policy,
default_ack_deadline=None,
default_retry_policy=None,
):
self._gc_project_id = gc_project_id
self._ack_deadline = default_ack_deadline or DEFAULT_ACK_DEADLINE
self.credentials = credentials if not USE_EMULATOR else None
self._message_storage_policy = message_storage_policy
self._client = pubsub_v1.SubscriberClient(credentials=credentials)
self._retry_policy = default_retry_policy

Expand Down Expand Up @@ -83,7 +87,14 @@ def create_subscription(self, subscription):

def _create_topic(self, topic_path):
publisher_client = pubsub_v1.PublisherClient(credentials=self.credentials)
return publisher_client.create_topic(request={"name": topic_path})
return publisher_client.create_topic(
request={
"name": topic_path,
"message_storage_policy": MessageStoragePolicy(
{"allowed_persistence_regions": [self._message_storage_policy]}
),
}
)

def _create_subscription(self, subscription_path, topic_path, subscription):
request = {
Expand Down
2 changes: 1 addition & 1 deletion rele/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import importlib
import os
import warnings

from google.oauth2 import service_account

Expand Down Expand Up @@ -29,6 +28,7 @@ class Config:
def __init__(self, setting):
self._gc_project_id = setting.get("GC_PROJECT_ID")
self.gc_credentials_path = setting.get("GC_CREDENTIALS_PATH")
self.gc_storage_region = setting.get("GC_STORAGE_REGION", "europe-west1")
self.app_name = setting.get("APP_NAME")
self.sub_prefix = setting.get("SUB_PREFIX")
self.middleware = setting.get("MIDDLEWARE", default_middleware)
Expand Down
8 changes: 7 additions & 1 deletion rele/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ def __init__(
subscriptions,
gc_project_id=None,
credentials=None,
gc_storage_region=None,
default_ack_deadline=None,
threads_per_subscription=None,
default_retry_policy=None,
):
self._subscriber = Subscriber(
gc_project_id, credentials, default_ack_deadline, default_retry_policy
gc_project_id,
credentials,
gc_storage_region,
default_ack_deadline,
default_retry_policy,
)
self._futures = {}
self._subscriptions = subscriptions
Expand Down Expand Up @@ -153,6 +158,7 @@ def create_and_run(subs, config):
subs,
config.gc_project_id,
config.credentials,
config.gc_storage_region,
config.ack_deadline,
config.threads_per_subscription,
config.retry_policy,
Expand Down
8 changes: 6 additions & 2 deletions tests/commands/test_runrele.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ def mock_worker(self):
def test_calls_worker_start_and_setup_when_runrele(self, mock_worker):
call_command("runrele")

mock_worker.assert_called_with([], "rele-test", ANY, 60, 2, None)
mock_worker.assert_called_with(
[], "rele-test", ANY, "europe-west1", 60, 2, None
)
mock_worker.return_value.run_forever.assert_called_once_with()

def test_prints_warning_when_conn_max_age_not_set_to_zero(
Expand All @@ -37,5 +39,7 @@ def test_prints_warning_when_conn_max_age_not_set_to_zero(
"This may result in slots for database connections to "
"be exhausted." in err
)
mock_worker.assert_called_with([], "rele-test", ANY, 60, 2, None)
mock_worker.assert_called_with(
[], "rele-test", ANY, "europe-west1", 60, 2, None
)
mock_worker.return_value.run_forever.assert_called_once_with()
6 changes: 5 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def config(project_id):
"APP_NAME": "rele",
"SUB_PREFIX": "rele",
"GC_CREDENTIALS_PATH": "tests/dummy-pub-sub-credentials.json",
"GC_STORAGE_REGION": "some-region",
"MIDDLEWARE": ["rele.contrib.LoggingMiddleware"],
}
)
Expand All @@ -39,6 +40,7 @@ def config_with_retry_policy(project_id):
"APP_NAME": "rele",
"SUB_PREFIX": "rele",
"GC_CREDENTIALS_PATH": "tests/dummy-pub-sub-credentials.json",
"GC_STORAGE_REGION": "some-region",
"MIDDLEWARE": ["rele.contrib.LoggingMiddleware"],
"DEFAULT_RETRY_POLICY": RetryPolicy(5, 30),
}
Expand All @@ -47,7 +49,9 @@ def config_with_retry_policy(project_id):

@pytest.fixture
def subscriber(project_id, config):
return Subscriber(config.gc_project_id, config.credentials, 60)
return Subscriber(
config.gc_project_id, config.credentials, config.gc_storage_region, 60
)


@pytest.fixture
Expand Down
9 changes: 8 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient
from google.protobuf import duration_pb2
from google.pubsub_v1 import MessageStoragePolicy

from rele import Subscriber
from rele.retry_policy import RetryPolicy
Expand Down Expand Up @@ -279,7 +280,12 @@ def test_creates_topic_when_subscription_topic_does_not_exist(

assert _mocked_client.call_count == 2
mock_create_topic.assert_called_with(
request={"name": f"projects/rele-test/topics/{project_id}-test-topic"}
request={
"name": f"projects/rele-test/topics/{project_id}-test-topic",
"message_storage_policy": MessageStoragePolicy(
{"allowed_persistence_regions": ["some-region"]}
),
}
)

_mocked_client.assert_called_with(
Expand Down Expand Up @@ -328,6 +334,7 @@ def test_default_retry_policy_is_applied_when_not_explicitly_provided(
subscriber = Subscriber(
config_with_retry_policy.gc_project_id,
config_with_retry_policy.credentials,
config_with_retry_policy.gc_storage_region,
60,
config_with_retry_policy.retry_policy,
)
Expand Down
5 changes: 4 additions & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@sub(topic="some-cool-topic", prefix="rele")
def sub_stub(data, **kwargs):
print(f"I am a task doing stuff.")
print("I am a task doing stuff.")


@pytest.fixture
Expand All @@ -25,6 +25,7 @@ def worker(config):
subscriptions,
config.gc_project_id,
config.credentials,
config.gc_storage_region,
default_ack_deadline=60,
threads_per_subscription=10,
default_retry_policy=config.retry_policy,
Expand Down Expand Up @@ -105,6 +106,7 @@ def test_creates_subscription_with_custom_ack_deadline_from_environment(
worker = Worker(
subscriptions,
config.gc_project_id,
config.gc_storage_region,
config.credentials,
custom_ack_deadline,
threads_per_subscription=10,
Expand Down Expand Up @@ -168,6 +170,7 @@ def test_waits_forever_when_called_with_config_and_subs(
subscriptions,
"rele-test",
ANY,
"some-region",
60,
2,
RetryPolicy(5, 30),
Expand Down

0 comments on commit 826a717

Please sign in to comment.