Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pubsub batch for asyncio #355

Merged
merged 4 commits into from
Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 2 additions & 5 deletions ingestion-edge/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,8 @@ environment variables:
attributes, defaults to `["Content-Length", "Date", "DNT", "User-Agent",
"X-Forwarded-For", "X-Pingsender-Version", "X-Pipeline-Proxy"]`
- `PUBLISH_TIMEOUT_SECONDS`: a float indicating the maximum number of seconds
to wait for the PubSub client to complete a publish operation during an HTTP
request, defaults to 1 second and may require tuning
- `FLUSH_PUBLISH_TIMEOUT_SECONDS`: a float indicating the maximum number of
seconds to wait for the PubSub client to complete a publish operation when
flushing, defaults to `PUBLISH_TIMEOUT_SECONDS`
to wait for the PubSub client to complete a publish operation, defaults to 1
second and may require tuning
- `FLUSH_CONCURRENT_MESSAGES`: an integer indicating the number of messages per
worker that may be read from the queue before waiting on publish results,
defaults to 1000 messages based on [publish request
Expand Down
4 changes: 0 additions & 4 deletions ingestion-edge/ingestion_edge/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ class Route:

PUBLISH_TIMEOUT_SECONDS = float(environ.get("PUBLISH_TIMEOUT_SECONDS", 1))

FLUSH_PUBLISH_TIMEOUT_SECONDS = float(
environ.get("FLUSH_PUBLISH_TIMEOUT_SECONDS", PUBLISH_TIMEOUT_SECONDS)
)

FLUSH_CONCURRENT_BYTES = int(environ.get("FLUSH_CONCURRENT_BYTES", 1e7))

FLUSH_CONCURRENT_MESSAGES = int(environ.get("FLUSH_CONCURRENT_MESSAGES", 1000))
Expand Down
10 changes: 2 additions & 8 deletions ingestion-edge/ingestion_edge/flush.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

"""Fallback logic for retrying queued submit requests."""

from .util import async_wrap
from dataclasses import dataclass
from functools import partial
from google.cloud.pubsub_v1 import PublisherClient
Expand All @@ -25,7 +24,6 @@ class Flush:
concurrent_bytes: int
concurrent_messages: int
sleep_seconds: float
publish_timeout_seconds: Optional[float] = None
running: bool = False
task: Optional[asyncio.Task] = None
sleep_task: Optional[asyncio.Task] = None
Expand All @@ -40,8 +38,7 @@ def set_status(
except: # noqa: E722
# message was not delivered
self.q.nack(message)
# raise from bare except
raise
# do not raise in callback
else:
# message delivered
self.q.ack(message)
Expand Down Expand Up @@ -73,11 +70,9 @@ async def _flush(self):
# record size of message
total_bytes += len(data)
# publish message
future = async_wrap(self.client.publish(topic, data, **attrs))
future = self.client.publish(topic, data, **attrs)
# ack or nack by callback
future.add_done_callback(partial(self.set_status, message))
# add timeout to future
future = asyncio.wait_for(future, self.publish_timeout_seconds)
# wait for this later
pending.append(future)
except Exception:
Expand Down Expand Up @@ -130,7 +125,6 @@ async def after_server_stop(self, *_):
self.sleep_task.cancel()
# wait for current flush to finish
await self.task
self.running = True
# flush until empty
while await self._flush():
pass
Expand Down
34 changes: 25 additions & 9 deletions ingestion-edge/ingestion_edge/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,23 @@
from sanic import Sanic, response
from sanic.request import Request
from functools import partial
from google.api_core.retry import Retry, if_exception_type
from google.cloud.pubsub_v1 import PublisherClient
from persistqueue import SQLiteAckQueue
from typing import Dict, Optional, Tuple
from .util import async_wrap, HTTP_STATUS
import asyncio
from typing import Dict, Tuple
from .util import AsyncioBatch, HTTP_STATUS
import google.api_core.exceptions

TRANSIENT_ERRORS = if_exception_type(
# Service initiated retry
google.api_core.exceptions.Aborted,
# Service interrupted when handling the request
google.api_core.exceptions.Cancelled,
# Service throttled the request
google.api_core.exceptions.TooManyRequests,
# Service outage or connection issue
google.api_core.exceptions.ServerError,
)


async def submit(
Expand All @@ -22,7 +34,6 @@ async def submit(
q: SQLiteAckQueue,
topic: str,
metadata_headers: Dict[str, str],
timeout: Optional[float],
**kwargs
) -> response.HTTPResponse:
"""Deliver request to the pubsub topic.
Expand Down Expand Up @@ -56,8 +67,9 @@ async def submit(
"header too large\n", HTTP_STATUS.REQUEST_HEADER_FIELDS_TOO_LARGE
)
try:
future = client.publish(topic, data, **attrs)
await asyncio.wait_for(async_wrap(future), timeout)
await client.publish(topic, data, **attrs)
except ValueError:
return response.text("payload too large\n", HTTP_STATUS.PAYLOAD_TOO_LARGE)
except Exception:
# api call failure, write to queue
try:
Expand All @@ -70,10 +82,15 @@ async def submit(

def init_app(app: Sanic) -> Tuple[PublisherClient, SQLiteAckQueue]:
"""Initialize Sanic app with url rules."""
# Get PubSub timeout
timeout = app.config.get("PUBLISH_TIMEOUT_SECONDS")
# Initialize PubSub client
timeout = app.config.get("PUBLISH_TIMEOUT_SECONDS", None)
client = PublisherClient()
client.api.publish = partial(
client.api.publish,
retry=Retry(TRANSIENT_ERRORS, deadline=timeout),
timeout=timeout,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python threads are unkillable, so they have to time out on their own. The publish api call happens in a thread, so we need to use the built in retry and timeout or we get unusual behavior where everyone waiting times out, but the api call is still retrying in a thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also why FLUSH_PUBLISH_TIMEOUT_SECONDS went away, because we don't control batching we have to set a timeout to apply to all batches.

client._batch_class = AsyncioBatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I would have thought to patch the PublisherClient instance this way. It looks like it turns out to be a fairly elegant way of providing the customized behavior we need 👍 Replacing publish with a partial is particularly nice.

# Use a SQLiteAckQueue because:
# * we use acks to ensure messages only removed on success
# * persist-queue's SQLite*Queue is faster than its Queue
Expand All @@ -99,7 +116,6 @@ def init_app(app: Sanic) -> Tuple[PublisherClient, SQLiteAckQueue]:
q=q,
topic=route.topic,
metadata_headers=metadata_headers,
timeout=timeout,
)
for route in app.config["ROUTE_TABLE"]
}
Expand Down
99 changes: 80 additions & 19 deletions ingestion-edge/ingestion_edge/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,102 @@

"""Utilities."""

from google.cloud.pubsub_v1.gapic.publisher_client import PublisherClient
from google.cloud.pubsub_v1.publisher.exceptions import PublishError
from google.cloud.pubsub_v1.types import BatchSettings, PublishRequest, PubsubMessage
from typing import List, Optional
import asyncio


def async_wrap(future, loop=None):
"""Wrap a threading-like future in an async future that can be awaited.
class AsyncioBatch:
"""Batch for google.cloud.pubsub_v1.PublisherClient in asyncio.

example:

async def func():
future = ...
await wrap(future)
Workaround for https://github.com/googleapis/google-cloud-python/issues/7104
"""
async_future = asyncio.Future()

def result():
try:
async_future.set_result(future.result())
except Exception as e:
async_future.set_exception(e)
def __init__(
self,
client: PublisherClient,
topic: str,
settings: BatchSettings,
autocommit: bool = True,
):
"""Initialize."""
self.client = client
self.topic = topic
self.settings = settings

self.full = asyncio.Event()
self.messages: List[PubsubMessage] = []
# fix https://github.com/googleapis/google-cloud-python/issues/7108
self.size = PublishRequest(topic=topic, messages=[]).ByteSize()

# Create a task to commit when full
self.result = asyncio.create_task(self.commit())

# If max latency is specified start a task to monitor the batch
# and commit when max latency is reached
if autocommit and self.settings.max_latency < float("inf"):
asyncio.create_task(self.monitor())

async def monitor(self):
"""Sleep until max latency is reached then set the batch to full."""
await asyncio.sleep(self.settings.max_latency)
self.full.set()

async def commit(self) -> List[str]:
"""Publish this batch when full."""
await self.full.wait()

if not self.messages:
return []

response = await asyncio.get_running_loop().run_in_executor(
None, self.client.api.publish, self.topic, self.messages
)

if len(response.message_ids) != len(self.messages):
raise PublishError("Some messages not successfully published")

return response.message_ids
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still wrapping my head around much of python's async features, so reading this is a good learning experience. I like the way this reads.

Is there any need to cancel tasks once we commit? If full is set due to hitting max size, what happens to the monitor task? Does it still sleep and then set full again? Does that prevent this class instance from getting cleaned up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any need to cancel tasks once we commit?

I tried to do that in flush, it introduced a weird bug that i couldn't figure out how to fix where it propagated down the stack to run_in_exector and raised concurrent.futures.CancelledError instead of asyncio.CancelledError, and that would in turn kill uvloop, instead of raising back up the stack.

If full is set due to hitting max size, what happens to the monitor task? Does it still sleep and then set full again?

yes, but setting full again is a noop because the event is already set.

Note that asyncio.Event stays set and waiting on it after that is a noop, where asyncio.Condition will only notify pending tasks and waiting on it after that waits for the next notify. This is the exact opposite of what I expect from the names, but it's been years since I last dealt with concurrency primitives.

Does that prevent this class instance from getting cleaned up?

nope, the task will end, the other tasks will already all be complete, there will be no references, and it will be cleaned up.


if loop is None:
loop = asyncio.get_event_loop()
def publish(self, message: PubsubMessage) -> Optional[asyncio.Task]:
"""Asynchronously publish message."""
# check if batch is not full
if not self.full.is_set():
# check if batch can accept message
index = len(self.messages)
new_size = self.size + PublishRequest(messages=[message]).ByteSize()
overflow = (
new_size > self.settings.max_bytes
or index + 1 > self.settings.max_messages
)

def callback(_):
loop.call_soon_threadsafe(result)
if overflow:
# fix https://github.com/googleapis/google-cloud-python/issues/7107
if not self.messages:
raise ValueError("Message exceeds max bytes")
# batch is full because it could not accept message
self.full.set()
else:
# Store message in the batch.
self.messages.append(message)
self.size = new_size

future.add_done_callback(callback)
# return a task to await for the message id
return asyncio.create_task(self.message_id(index))
return None # the batch cannot accept a message

return async_future
async def message_id(self, index: int) -> str:
"""Get message id from result by index."""
return (await self.result)[index]


class HTTP_STATUS:
"""HTTP Status Codes for responses."""

OK = 200
BAD_REQUEST = 400
PAYLOAD_TOO_LARGE = 413
REQUEST_HEADER_FIELDS_TOO_LARGE = 431
INSUFFICIENT_STORAGE = 507
9 changes: 6 additions & 3 deletions ingestion-edge/pubsub_emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,16 @@ def Publish(
self.logger.debug("Publish(%.100s)", LazyFormat(request))
if request.topic in self.status_codes:
context.abort(self.status_codes[request.topic], "Override")
if self.sleep is not None:
time.sleep(self.sleep)
message_ids: List[str] = []
try:
subscriptions = self.topics[request.topic]
except KeyError:
context.abort(grpc.StatusCode.NOT_FOUND, "Topic not found")
message_ids = [uuid.uuid4().hex for _ in request.messages]
if self.sleep is not None:
time.sleep(self.sleep)
# return a valid response without recording messages
return pubsub_pb2.PublishResponse(message_ids=message_ids)
for _id, message in zip(message_ids, request.messages):
message.message_id = _id
for subscription in subscriptions:
Expand Down Expand Up @@ -228,7 +230,8 @@ def UpdateTopic(

For the override key "sleep" the override value indicates a number of
seconds Publish requests should sleep before returning, and non-empty
override values must be a valid float.
override values must be a valid float. Publish requests will return
a valid response without recording messages.
"""
self.logger.debug("UpdateTopic(%s)", LazyFormat(request))
for override in request.update_mask.paths:
Expand Down
12 changes: 10 additions & 2 deletions ingestion-edge/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import _pytest.config.argparsing
import _pytest.fixtures
import grpc
import logging
import os
import psutil
import pytest
Expand Down Expand Up @@ -47,6 +48,7 @@ def pubsub(
yield "remote"
elif request.config.getoption("server") is None:
emulator = PubsubEmulator(max_workers=1, port=0)
emulator.logger.setLevel(logging.DEBUG)
try:
os.environ["PUBSUB_EMULATOR_HOST"] = "localhost:%d" % emulator.port
yield emulator
Expand Down Expand Up @@ -100,8 +102,14 @@ def server(
assert process.poll() is None # server still running
yield "http://localhost:%d" % ports.pop()
finally:
process.kill()
process.wait()
try:
# allow one second for graceful termination
process.terminate()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this allows --cov to pick up coverage from integration tests when --server isn't specified

process.wait(1)
except subprocess.TimeoutExpired:
# kill after one second
process.kill()
process.wait()
else:
yield _server

Expand Down