Skip to content

Commit

Permalink
types: Emit tags from BeginBlock/EndBlock
Browse files Browse the repository at this point in the history
This commit makes both EventNewBlock and EventNewBlockHeader emit tags
on the event bus, so subscribers can use them in queries.

This is a BREAKING change due to adding a field to the ABCIResponses
structure which is persisted to disk.
  • Loading branch information
kostko committed Nov 9, 2018
1 parent 6e9aee5 commit ea14968
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 21 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
### BREAKING CHANGES:

* CLI/RPC/Config
* [state] [\#2747](https://github.com/tendermint/tendermint/pull/2747) Add `BeginBlock` response to `ABCIResponses` (BREAKING).

* Apps

Expand All @@ -22,6 +23,8 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi

### FEATURES:

- [types] [\#1571](https://github.com/tendermint/tendermint/issues/1571) Enable subscription to tags emitted from `BeginBlock`/`EndBlock` (@kostko)

### IMPROVEMENTS:

### BUG FIXES:
Expand Down
4 changes: 3 additions & 1 deletion docs/spec/abci/abci.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ include a `Tags` field in their `Response*`. Each tag is key-value pair denoting
something about what happened during the methods execution.

Tags can be used to index transactions and blocks according to what happened
during their execution.
during their execution. Note that the set of tags returned for a block from
`BeginBlock` and `EndBlock` are merged. In case both methods return the same
tag, only the value defined in `EndBlock` is used.

Keys and values in tags must be UTF-8 encoded strings (e.g.
"account.owner": "Bob", "balance": "100.0",
Expand Down
17 changes: 13 additions & 4 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,9 @@ func execBlockOnProxyApp(

commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)

// Begin block.
_, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
// Begin block
var err error
abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
Hash: block.Hash(),
Header: types.TM2PB.Header(&block.Header),
LastCommitInfo: commitInfo,
Expand Down Expand Up @@ -425,8 +426,16 @@ func updateState(
// Fire TxEvent for every tx.
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
eventBus.PublishEventNewBlock(types.EventDataNewBlock{
Block: block,
ResultBeginBlock: *abciResponses.BeginBlock,
ResultEndBlock: *abciResponses.EndBlock,
})
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: block.Header,
ResultBeginBlock: *abciResponses.BeginBlock,
ResultEndBlock: *abciResponses.EndBlock,
})

for i, tx := range block.Data.Txs {
eventBus.PublishEventTx(types.EventDataTx{types.TxResult{
Expand Down
5 changes: 3 additions & 2 deletions state/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ func saveState(db dbm.DB, state State, key []byte) {
// of the various ABCI calls during block processing.
// It is persisted to disk for each height before calling Commit.
type ABCIResponses struct {
DeliverTx []*abci.ResponseDeliverTx
EndBlock *abci.ResponseEndBlock
DeliverTx []*abci.ResponseDeliverTx
BeginBlock *abci.ResponseBeginBlock
EndBlock *abci.ResponseEndBlock
}

// NewABCIResponses returns a new ABCIResponses
Expand Down
2 changes: 1 addition & 1 deletion tools/tm-monitor/monitor/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestNodeNewBlockReceived(t *testing.T) {
n.SendBlocksTo(blockCh)

blockHeader := tmtypes.Header{Height: 5}
emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{blockHeader})
emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{Header: blockHeader})

assert.Equal(t, int64(5), n.Height)
assert.Equal(t, blockHeader, <-blockCh)
Expand Down
52 changes: 39 additions & 13 deletions types/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,48 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
return nil
}

func (b *EventBus) validateAndStringifyTags(tags []cmn.KVPair, logger log.Logger) map[string]string {
result := make(map[string]string)
for _, tag := range tags {
// basic validation
if len(tag.Key) == 0 {
logger.Debug("Got tag with an empty key (skipping)", "tag", tag)
continue
}
result[string(tag.Key)] = string(tag.Value)
}
return result
}

func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
return b.Publish(EventNewBlock, data)
// no explicit deadline for publishing events
ctx := context.Background()

resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("block", data.Block.StringShort()))

// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlock

b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
return nil
}

func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
return b.Publish(EventNewBlockHeader, data)
// no explicit deadline for publishing events
ctx := context.Background()

resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
// TODO: Create StringShort method for Header and use it in logger.
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("header", data.Header))

// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlockHeader

b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
return nil
}

func (b *EventBus) PublishEventVote(data EventDataVote) error {
Expand All @@ -94,17 +130,7 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
// no explicit deadline for publishing events
ctx := context.Background()

tags := make(map[string]string)

// validate and fill tags from tx result
for _, tag := range data.Result.Tags {
// basic validation
if len(tag.Key) == 0 {
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", data.Tx)
continue
}
tags[string(tag.Key)] = string(tag.Value)
}
tags := b.validateAndStringifyTags(data.Result.Tags, b.Logger.With("tx", data.Tx))

// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
Expand Down
84 changes: 84 additions & 0 deletions types/event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,90 @@ func TestEventBusPublishEventTx(t *testing.T) {
}
}

func TestEventBusPublishEventNewBlock(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()

block := MakeBlock(0, []Tx{}, nil, []Evidence{})
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}

txEventsCh := make(chan interface{})

// PublishEventNewBlock adds the tm.event tag, so the query below should work
query := "tm.event='NewBlock' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
require.NoError(t, err)

done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}
}()

err = eventBus.PublishEventNewBlock(EventDataNewBlock{
Block: block,
ResultBeginBlock: resultBeginBlock,
ResultEndBlock: resultEndBlock,
})
assert.NoError(t, err)

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a block after 1 sec.")
}
}

func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()

block := MakeBlock(0, []Tx{}, nil, []Evidence{})
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}

txEventsCh := make(chan interface{})

// PublishEventNewBlockHeader adds the tm.event tag, so the query below should work
query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
require.NoError(t, err)

done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlockHeader)
assert.Equal(t, block.Header, edt.Header)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}
}()

err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{
Header: block.Header,
ResultBeginBlock: resultBeginBlock,
ResultEndBlock: resultEndBlock,
})
assert.NoError(t, err)

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a block header after 1 sec.")
}
}

func TestEventBusPublish(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
Expand Down
7 changes: 7 additions & 0 deletions types/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

amino "github.com/tendermint/go-amino"
abci "github.com/tendermint/tendermint/abci/types"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
)
Expand Down Expand Up @@ -54,11 +55,17 @@ func RegisterEventDatas(cdc *amino.Codec) {

type EventDataNewBlock struct {
Block *Block `json:"block"`

ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"`
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
}

// light weight event for benchmarking
type EventDataNewBlockHeader struct {
Header Header `json:"header"`

ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"`
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
}

// All txs fire EventDataTx
Expand Down

0 comments on commit ea14968

Please sign in to comment.