Skip to content
This repository has been archived by the owner on Jul 15, 2018. It is now read-only.

pubsub: make message tags available to subscribers #143

Closed
silasdavis opened this issue Feb 5, 2018 · 3 comments
Closed

pubsub: make message tags available to subscribers #143

silasdavis opened this issue Feb 5, 2018 · 3 comments

Comments

@silasdavis
Copy link
Contributor

silasdavis commented Feb 5, 2018

tl;dr replace out chan<-interface{} with out chan<-messageAndTags

Problem

Consider the following interfaces replicated from the pubsub Sever:

type Subscribable interface {
	Subscribe(ctx context.Context, subscriber string, query pubsub.Query, out chan<- interface{}) error
	Unsubscribe(ctx context.Context, subscriber string, query pubsub.Query) error
	UnsubscribeAll(ctx context.Context, subscriber string) error
}

type Publisher interface {
	Publish(ctx context.Context, message interface{}, tags map[string]interface{}) error
}

Subscribers receive messages (of type interface{}) passed on a channel to them based on their query. We can tell that the tags the original Publisher opted for match our query, but our query might be broad, and we have no way of telling which tags matched. There's nothing that guarantees we can reproduce the tags by looking at the message (we might if we know something about its type). You could argue that tags are not part of the message and shouldn't change its meaning (or something like that), but I think that's an odd position (particularly given the usage in Tendermint - where in contains the domain level type/ID).

This is especially bad for my use case, where I want to forward all messages published on one pubsub to another with:

func PublishAll(subscribable Subscribable, ctx context.Context, subscriber string, query pubsub.Query,
	publisher Publisher, extraTags map[string]interface{}) error {

	return SubscribeCallback(subscribable, ctx, subscriber, query, func(message interface{}) {
		tags := make(map[string]interface{})
		// this is where I'd merge in the tags originally used with the message but I have no access to them
		for k,v := range extraTags {
			tags[k] = v
		}
		// Help! I can't tell which tags the original publisher used - so I can't forward them
		publisher.Publish(ctx, message, tags)
	})
}

To bulk forward I would use:

PublishAll(pubsubIn, ctx, "forwarder", query.empty{}, pubsubOut)

Where the situation is worst; the interface for reading messages means the tags are erased.

Possible solutions

The obvious solutions would be to provide a message type like:

type Message struct {
	Body interface{}
	Tags map[string]interface{}
}

// and Subscribe becomes:
func Subscribe(ctx context.Context, subscriber string, query pubsub.Query, out chan<- Message) error {
	...
} 

A solution I like better is to drop the message/tag distinction and make a message a key-value map, as if everything were a tag:

type Publisher interface {
	Publish(ctx context.Context, keyvals map[string]interface{}) error
}

You can still have a canonical key that is used for messages if required. This has the added bonus that when the message is a type supported by your query language you can also filter on message rather than only on distinguished tags. This starts to look quite a lot like your logging interface (probably because it is, and logs <-> events), which suggests:

type Publisher interface {
	Publish(ctx context.Context, keyvals... interface{}) error
}

// and Subscribe becomes:
func Subscribe(ctx context.Context, subscriber string, query pubsub.Query, out chan<- []interface) error {
	...
} 

This also means you can start to share output sinks and middlewares from you logging system with the event system (certainly works for me... I have plenty of things built for go-kit log's interface: https://github.com/hyperledger/burrow/blob/develop/logging/config/sinks.go). This is kind of a pleasing interface for callers too.

Another thing this interface gives you is an easier way to de-duplicate events because you can have a guaranteed order of tags/structured values if you wanted them - and hash them deterministically which is not immediately possible with a map.

@silasdavis
Copy link
Contributor Author

@ebuchman as mentioned today on call

@Bric3d
Copy link

Bric3d commented Apr 5, 2018

I can try to work on this a bit, has a decision been taken on which solution to choose ?

@zramsay
Copy link
Contributor

zramsay commented Jul 3, 2018

moved to tendermint/tendermint#1879

@zramsay zramsay closed this as completed Jul 3, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants