Skip to content

Commit

Permalink
Revert "Event-Publisher Flush Queue on Shutdown (#767)" (#790)
Browse files Browse the repository at this point in the history
This reverts commit fcc0c45.
  • Loading branch information
KTAtkinson committed May 15, 2023
1 parent 222ad9f commit ee16153
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 41 deletions.
4 changes: 0 additions & 4 deletions baseplate/sidecars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ def age(self) -> float:
return 0
return time.time() - self.batch_start

@property
def is_ready(self) -> bool:
return self.age >= self.max_age

def add(self, item: Optional[bytes]) -> None:
if self.age >= self.max_age:
raise BatchFull
Expand Down
49 changes: 12 additions & 37 deletions baseplate/sidecars/event_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import hashlib
import hmac
import logging
import signal
import sys

from types import FrameType
from typing import Any
from typing import List
from typing import Optional
Expand Down Expand Up @@ -167,16 +164,6 @@ def publish(self, payload: SerializedBatch) -> None:
SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch}


def serialize_and_publish_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None:
"""Serializes batch, publishes it using the publisher, and then resets the batch for more messages."""
serialized_batch = batcher.serialize()
try:
publisher.publish(serialized_batch)
except Exception:
logger.exception("Events publishing failed.")
batcher.reset()


def publish_events() -> None:
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
Expand Down Expand Up @@ -227,28 +214,6 @@ def publish_events() -> None:
batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE)
publisher = BatchPublisher(metrics_client, cfg)

def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
"""Signal handler for flushing messages from the queue and publishing them."""
message: Optional[bytes]
logger.info("Shutdown signal received. Flushing events...")

while True:
try:
message = event_queue.get(timeout=0.2)
except TimedOutError:
if len(batcher.serialize()) > 0:
serialize_and_publish_batch(publisher, batcher)
break

if batcher.is_ready:
serialize_and_publish_batch(publisher, batcher)
batcher.add(message)
sys.exit(0)

for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, flush_queue_signal_handler)
signal.siginterrupt(sig, False)

while True:
message: Optional[bytes]

Expand All @@ -257,8 +222,18 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
except TimedOutError:
message = None

if batcher.is_ready:
serialize_and_publish_batch(publisher, batcher)
try:
batcher.add(message)
continue
except BatchFull:
pass

serialized = batcher.serialize()
try:
publisher.publish(serialized)
except Exception:
logger.exception("Events publishing failed.")
batcher.reset()
batcher.add(message)


Expand Down

0 comments on commit ee16153

Please sign in to comment.