From 056d3356a6b46d2c341c29a30b09129a86009855 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 2 Oct 2018 20:39:28 +0400 Subject: [PATCH] pubsub adr Refs #951, #1879, #1880 --- docs/architecture/adr-033-pubsub.md | 108 ++++++++++++++++++++++++++++ docs/architecture/adr-template.md | 4 ++ 2 files changed, 112 insertions(+) create mode 100644 docs/architecture/adr-033-pubsub.md diff --git a/docs/architecture/adr-033-pubsub.md b/docs/architecture/adr-033-pubsub.md new file mode 100644 index 000000000000..b2439e1e6e52 --- /dev/null +++ b/docs/architecture/adr-033-pubsub.md @@ -0,0 +1,108 @@ +# ADR 033: pubsub 2.0 + +Author: Anton Kaliaev (@melekes) + +## Changelog + +02-10-2018: Initial draft + +## Context + +Since the initial version of the pubsub, there's been a number of issues +raised: #951, #1879, #1880. Some of them are high-level issues questioning the +core design choices made by us. Others are minor and mostly about the interface +of `Subscribe()` / `Publish()` functions. + +### Sync vs Async + +Now, when publishing a message to subscribers, we can do it in a goroutine: + +```go +for each subscriber { + out := subscriber.outc + go func() { + out <- msg + } +} +``` +or +```go +for each subscriber { + go subscriber.callbackFn() +} +``` + +This gives us greater performance and allow us to avoid "slow client problem" +(when other subscribers have to wait for a slow subscriber). A pool of +goroutines can be used to avoid uncontrolled memory growth. + +In certain cases, this is what we want. In other cases, when for example we +need strict ordering of events (if event A was published before B, the delivery +order will be A -> B), we can't use goroutines. + +There is also a question whenever we should have a non-blocking send: + +```go +for each subscriber { + out := subscriber.outc + select { + case out <- msg: + default: + log("subscriber %v buffer is full, skipping...") + } +} +``` + +This fixes the "slow client problem", but there is no way for a slow client to +know if it had missed a message. + +### Channels vs Callbacks + +There is also a question whenever we should use channels for message +transmission or call subscriber-defined callbacks. Callbacks give developers +more flexibility - you can call mutex's there, channels, spawn goroutines, +anything you really want. But they also carry local scope, which can result in +memory leaks. + +### Why `Subscribe()` accepts an `out` channel? + +Because in our tests, we create buffered channels (cap: 1). Alternatively, we +can make capacity an argument. + +## Decision + +Change Subscribe() function to return out channel: + +```go +// outCap can be used to set capacity of out channel (unbuffered by default). +Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (out <-chan interface{}, err error) { +``` + +It's more idiomatic since we're closing it during Unsubscribe/UnsubscribeAll calls. + +Also, we should make tags available to subscribers: + +```go +type MsgAndTags struct { + Msg interface{} + Tags TagMap +} + +// outCap can be used to set capacity of out channel (unbuffered by default). +Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (out <-chan MsgAndTags, err error) { +``` + +## Status + +In review + +## Consequences + +### Positive + +- more idiomatic interface +- subscribers know what tags msg was published with + +### Negative + +### Neutral diff --git a/docs/architecture/adr-template.md b/docs/architecture/adr-template.md index d47c7f5580f1..4879afc408a4 100644 --- a/docs/architecture/adr-template.md +++ b/docs/architecture/adr-template.md @@ -1,5 +1,9 @@ # ADR 000: Template for an ADR +Author: + +## Changelog + ## Context ## Decision