Skip to content

Commit

Permalink
Update subscriptions with retry policy (#252)
Browse files Browse the repository at this point in the history
* Split subscriber and publisher tests.

* Implement update subscription with retry policy.

* Don't update if retry policy not provided.

* Encapsulate update in separated (private) method.

* Rename method to update_or_create_subscription

* Beat the linter.

* Sorting imports.

* Add bumping version data.
Fix update_subscription execution under context manager.
  • Loading branch information
jonasae committed May 9, 2023
1 parent 8b0ce37 commit 26de37d
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 203 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Changelog
=========

1.11.0 (2023-05-09)
-------------------
* [Added] Allow updating retry policy to existing subscriptions. (#248)

1.10.0 (2023-05-02)
-------------------
* [Added] Add configuration for setting the storage region for pubsub messages (#247)
Expand Down
2 changes: 1 addition & 1 deletion rele/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "1.10.0"
__version__ = "1.11.0"

try:
import django
Expand Down
50 changes: 36 additions & 14 deletions rele/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import os
import time
from concurrent.futures import TimeoutError
from contextlib import suppress

import google.auth
from google.api_core import exceptions
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import FieldMask
from google.protobuf import duration_pb2
from google.pubsub_v1 import MessageStoragePolicy
from google.pubsub_v1 import RetryPolicy as GCloudRetryPolicy
Expand Down Expand Up @@ -58,8 +58,9 @@ def __init__(
self._client = pubsub_v1.SubscriberClient(credentials=credentials)
self._retry_policy = default_retry_policy

def create_subscription(self, subscription):
"""Handles creating the subscription when it does not exists.
def update_or_create_subscription(self, subscription):
"""Handles creating the subscription when it does not exists or updates it
if the subscription contains any parameter that allows it.
This makes it easier to deploy a worker and forget about the
subscription side of things. The subscription must
Expand All @@ -73,17 +74,18 @@ def create_subscription(self, subscription):
)
topic_path = self._client.topic_path(self._gc_project_id, subscription.topic)

with suppress(exceptions.AlreadyExists):
try:
self._create_subscription(subscription_path, topic_path, subscription)
except exceptions.NotFound:
logger.warning(
"Cannot subscribe to a topic that does not exist."
f"Creating {topic_path}..."
)
topic = self._create_topic(topic_path)
logger.info(f"Topic {topic.name} created.")
self._create_subscription(subscription_path, topic_path, subscription)
try:
self._create_subscription(subscription_path, topic_path, subscription)
except exceptions.NotFound:
logger.warning(
"Cannot subscribe to a topic that does not exist."
f"Creating {topic_path}..."
)
topic = self._create_topic(topic_path)
logger.info(f"Topic {topic.name} created.")
self._create_subscription(subscription_path, topic_path, subscription)
except exceptions.AlreadyExists:
self._update_subscription(subscription_path, topic_path, subscription)

def _create_topic(self, topic_path):
publisher_client = pubsub_v1.PublisherClient(credentials=self.credentials)
Expand Down Expand Up @@ -113,6 +115,26 @@ def _create_subscription(self, subscription_path, topic_path, subscription):

self._client.create_subscription(request=request)

def _update_subscription(self, subscription_path, topic_path, subscription):
retry_policy = subscription.retry_policy or self._retry_policy

if not retry_policy:
return

update_mask = FieldMask(paths=["retry_policy"])

client_retry_policy = self._build_gcloud_retry_policy(retry_policy)

subscription = pubsub_v1.types.Subscription(
name=subscription_path,
topic=topic_path,
retry_policy=client_retry_policy,
)

self._client.update_subscription(
request={"subscription": subscription, "update_mask": update_mask}
)

def _build_gcloud_retry_policy(self, rele_retry_policy):
minimum_backoff = duration_pb2.Duration(
seconds=rele_retry_policy.minimum_backoff
Expand Down
2 changes: 1 addition & 1 deletion rele/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def setup(self):
re-created. Therefore, it is idempotent.
"""
for subscription in self._subscriptions:
self._subscriber.create_subscription(subscription)
self._subscriber.update_or_create_subscription(subscription)

def start(self):
"""Begin consuming all subscriptions.
Expand Down
155 changes: 155 additions & 0 deletions tests/test_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import concurrent
import decimal
import logging
from concurrent.futures import TimeoutError
from unittest.mock import ANY

import pytest


@pytest.mark.usefixtures("publisher", "time_mock")
class TestPublisher:
def test_returns_future_when_published_called(self, published_at, publisher):
message = {"foo": "bar"}
result = publisher.publish(
topic="order-cancelled", data=message, myattr="hello"
)

assert isinstance(result, concurrent.futures.Future)

publisher._client.publish.assert_called_with(
ANY,
b'{"foo": "bar"}',
myattr="hello",
published_at=str(published_at),
)

def test_save_log_when_published_called(self, published_at, publisher, caplog):
caplog.set_level(logging.DEBUG)
message = {"foo": "bar"}
publisher.publish(topic="order-cancelled", data=message, myattr="hello")

log = caplog.records[0]

assert log.message == "Publishing to order-cancelled"
assert log.pubsub_publisher_attrs == {
"myattr": "hello",
"published_at": str(published_at),
}
assert log.metrics == {
"name": "publications",
"data": {"agent": "rele", "topic": "order-cancelled"},
}

def test_publish_sets_published_at(self, published_at, publisher):
publisher.publish(topic="order-cancelled", data={"foo": "bar"})

publisher._client.publish.assert_called_with(
ANY, b'{"foo": "bar"}', published_at=str(published_at)
)

def test_publishes_data_with_custom_encoder(self, publisher, custom_encoder):
publisher._encoder = custom_encoder
publisher.publish(topic="order-cancelled", data=decimal.Decimal("1.20"))

publisher._client.publish.assert_called_with(ANY, b"1.2", published_at=ANY)

def test_publishes_data_with_client_timeout_when_blocking(
self, mock_future, publisher
):
publisher._timeout = 100.0
publisher.publish(topic="order-cancelled", data={"foo": "bar"}, blocking=True)

publisher._client.publish.return_value = mock_future
publisher._client.publish.assert_called_with(
ANY, b'{"foo": "bar"}', published_at=ANY
)
mock_future.result.assert_called_once_with(timeout=100)

def test_publishes_data_with_client_timeout_when_blocking_by_default(
self, mock_future, publisher
):
publisher._timeout = 100.0
publisher._blocking = True
publisher.publish(topic="order-cancelled", data={"foo": "bar"})

publisher._client.publish.return_value = mock_future
publisher._client.publish.assert_called_with(
ANY, b'{"foo": "bar"}', published_at=ANY
)
mock_future.result.assert_called_once_with(timeout=100)

def test_publishes_data_non_blocking_by_default(self, mock_future, publisher):
publisher._timeout = 100.0
publisher.publish(topic="order-cancelled", data={"foo": "bar"})

publisher._client.publish.return_value = mock_future
publisher._client.publish.assert_called_with(
ANY, b'{"foo": "bar"}', published_at=ANY
)
mock_future.result.assert_not_called()

def test_publishes_data_with_client_timeout_when_blocking_and_timeout_specified(
self, mock_future, publisher
):
publisher._timeout = 100.0
publisher.publish(
topic="order-cancelled",
data={"foo": "bar"},
blocking=True,
timeout=50,
)

publisher._client.publish.return_value = mock_future
publisher._client.publish.assert_called_with(
ANY, b'{"foo": "bar"}', published_at=ANY
)
mock_future.result.assert_called_once_with(timeout=50)

def test_runs_post_publish_failure_hook_when_future_result_raises_timeout(
self, mock_future, publisher, mock_post_publish_failure
):
message = {"foo": "bar"}
exception = TimeoutError()
mock_future.result.side_effect = exception

with pytest.raises(TimeoutError):
publisher.publish(
topic="order-cancelled", data=message, myattr="hello", blocking=True
)
mock_post_publish_failure.assert_called_once_with(
"order-cancelled", exception, {"foo": "bar"}
)

def test_raises_when_timeout_error_and_raise_exception_is_true(
self, publisher, mock_future
):
message = {"foo": "bar"}
e = TimeoutError()
mock_future.result.side_effect = e

with pytest.raises(TimeoutError):
publisher.publish(
topic="order-cancelled",
data=message,
myattr="hello",
blocking=True,
raise_exception=True,
)

def test_returns_future_when_timeout_error_and_raise_exception_is_false(
self, publisher, mock_future
):
message = {"foo": "bar"}
e = TimeoutError()
mock_future.result.side_effect = e

result = publisher.publish(
topic="order-cancelled",
data=message,
myattr="hello",
blocking=True,
raise_exception=False,
)

assert result is mock_future

0 comments on commit 26de37d

Please sign in to comment.