Skip to content

Commit

Permalink
types: Emit tags from BeginBlock/EndBlock
Browse files Browse the repository at this point in the history
This commit makes both EventNewBlock and EventNewBlockHeader emit tags
on the event bus, so subscribers can use them in queries.

This is a BREAKING change due to adding a field to the ABCIResponses
structure which is persisted to disk.
  • Loading branch information
kostko committed Nov 5, 2018
1 parent 80e4fe6 commit 78c16ef
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 30 deletions.
10 changes: 6 additions & 4 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,9 @@ func execBlockOnProxyApp(

commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)

// Begin block.
_, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
// Begin block
var err error
abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
Hash: block.Hash(),
Header: types.TM2PB.Header(&block.Header),
LastCommitInfo: commitInfo,
Expand Down Expand Up @@ -425,8 +426,9 @@ func updateState(
// Fire TxEvent for every tx.
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
resultTags := append(abciResponses.BeginBlock.Tags, abciResponses.EndBlock.Tags...)
eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}, resultTags)
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}, resultTags)

for i, tx := range block.Data.Txs {
eventBus.PublishEventTx(types.EventDataTx{types.TxResult{
Expand Down
5 changes: 3 additions & 2 deletions state/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ func saveState(db dbm.DB, state State, key []byte) {
// of the various ABCI calls during block processing.
// It is persisted to disk for each height before calling Commit.
type ABCIResponses struct {
DeliverTx []*abci.ResponseDeliverTx
EndBlock *abci.ResponseEndBlock
DeliverTx []*abci.ResponseDeliverTx
BeginBlock *abci.ResponseBeginBlock
EndBlock *abci.ResponseEndBlock
}

// NewABCIResponses returns a new ABCIResponses
Expand Down
58 changes: 41 additions & 17 deletions types/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,46 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
return nil
}

func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
return b.Publish(EventNewBlock, data)
func (b *EventBus) validateTags(tags []cmn.KVPair, logkv ...interface{}) map[string]string {
result := make(map[string]string)
for _, tag := range tags {
// basic validation
if len(tag.Key) == 0 {
logtags := append([]interface{}{"tag", tag}, logkv...)
b.Logger.Info("Got tag with an empty key (skipping)", logtags...)
continue
}
result[string(tag.Key)] = string(tag.Value)
}
return result
}

func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
return b.Publish(EventNewBlockHeader, data)
func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock, resultTags []cmn.KVPair) error {
// no explicit deadline for publishing events
ctx := context.Background()

tags := b.validateTags(resultTags, "block", data.Block)

// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlock

b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
return nil
}

func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader, resultTags []cmn.KVPair) error {
// no explicit deadline for publishing events
ctx := context.Background()

tags := b.validateTags(resultTags, "header", data.Header)

// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlockHeader

b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
return nil
}

func (b *EventBus) PublishEventVote(data EventDataVote) error {
Expand All @@ -94,17 +128,7 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
// no explicit deadline for publishing events
ctx := context.Background()

tags := make(map[string]string)

// validate and fill tags from tx result
for _, tag := range data.Result.Tags {
// basic validation
if len(tag.Key) == 0 {
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", data.Tx)
continue
}
tags[string(tag.Key)] = string(tag.Value)
}
tags := b.validateTags(data.Result.Tags, "tx", data.Tx)

// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
Expand Down Expand Up @@ -185,11 +209,11 @@ func (NopEventBus) UnsubscribeAll(ctx context.Context, subscriber string) error
return nil
}

func (NopEventBus) PublishEventNewBlock(data EventDataNewBlock) error {
func (NopEventBus) PublishEventNewBlock(data EventDataNewBlock, tags []cmn.KVPair) error {
return nil
}

func (NopEventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
func (NopEventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader, tags []cmn.KVPair) error {
return nil
}

Expand Down
82 changes: 77 additions & 5 deletions types/event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,82 @@ func TestEventBusPublishEventTx(t *testing.T) {
}
}

func TestEventBusPublishEventNewBlock(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()

block := MakeBlock(0, []Tx{}, nil, []Evidence{})
resultTags := []cmn.KVPair{
{Key: []byte("baz"), Value: []byte("1")},
{Key: []byte("foz"), Value: []byte("2")},
}

txEventsCh := make(chan interface{})

// PublishEventNewBlock adds the tm.event tag, so the query below should work
query := "tm.event='NewBlock' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
require.NoError(t, err)

done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
close(done)
}
}()

err = eventBus.PublishEventNewBlock(EventDataNewBlock{block}, resultTags)
assert.NoError(t, err)

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a transaction after 1 sec.")
}
}

func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()

block := MakeBlock(0, []Tx{}, nil, []Evidence{})
resultTags := []cmn.KVPair{
{Key: []byte("baz"), Value: []byte("1")},
{Key: []byte("foz"), Value: []byte("2")},
}

txEventsCh := make(chan interface{})

// PublishEventNewBlockHeader adds the tm.event tag, so the query below should work
query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
require.NoError(t, err)

done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlockHeader)
assert.Equal(t, block.Header, edt.Header)
close(done)
}
}()

err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{block.Header}, resultTags)
assert.NoError(t, err)

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a transaction after 1 sec.")
}
}

func TestEventBusPublish(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
Expand All @@ -68,7 +144,7 @@ func TestEventBusPublish(t *testing.T) {
err = eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, eventsCh)
require.NoError(t, err)

const numEventsExpected = 15
const numEventsExpected = 13
done := make(chan struct{})
go func() {
numEvents := 0
Expand All @@ -82,10 +158,6 @@ func TestEventBusPublish(t *testing.T) {

err = eventBus.Publish(EventNewBlockHeader, EventDataNewBlockHeader{})
require.NoError(t, err)
err = eventBus.PublishEventNewBlock(EventDataNewBlock{})
require.NoError(t, err)
err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{})
require.NoError(t, err)
err = eventBus.PublishEventVote(EventDataVote{})
require.NoError(t, err)
err = eventBus.PublishEventProposalHeartbeat(EventDataProposalHeartbeat{})
Expand Down
5 changes: 3 additions & 2 deletions types/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tendermint/libs/common"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
)
Expand Down Expand Up @@ -134,8 +135,8 @@ func QueryForEvent(eventType string) tmpubsub.Query {

// BlockEventPublisher publishes all block related events
type BlockEventPublisher interface {
PublishEventNewBlock(block EventDataNewBlock) error
PublishEventNewBlockHeader(header EventDataNewBlockHeader) error
PublishEventNewBlock(block EventDataNewBlock, tags []cmn.KVPair) error
PublishEventNewBlockHeader(header EventDataNewBlockHeader, tags []cmn.KVPair) error
PublishEventTx(EventDataTx) error
PublishEventValidatorSetUpdates(EventDataValidatorSetUpdates) error
}
Expand Down

0 comments on commit 78c16ef

Please sign in to comment.