Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

types: Emit tags from BeginBlock/EndBlock #2747

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ program](https://hackerone.com/tendermint).

### FEATURES:

- [types] [\#1571](https://github.com/tendermint/tendermint/issues/1571) Enable subscription to tags emitted from `BeginBlock`/`EndBlock` (@kostko)

### IMPROVEMENTS:

- [config] \#2877 add blocktime_iota to the config.toml (@ackratos)
Expand Down
4 changes: 3 additions & 1 deletion docs/spec/abci/abci.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ include a `Tags` field in their `Response*`. Each tag is key-value pair denoting
something about what happened during the methods execution.

Tags can be used to index transactions and blocks according to what happened
during their execution.
during their execution. Note that the set of tags returned for a block from
`BeginBlock` and `EndBlock` are merged. In case both methods return the same
tag, only the value defined in `EndBlock` is used.

Keys and values in tags must be UTF-8 encoded strings (e.g.
"account.owner": "Bob", "balance": "100.0",
Expand Down
17 changes: 13 additions & 4 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ func execBlockOnProxyApp(

commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)

// Begin block.
_, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
// Begin block
var err error
abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
Hash: block.Hash(),
Header: types.TM2PB.Header(&block.Header),
LastCommitInfo: commitInfo,
Expand Down Expand Up @@ -417,8 +418,16 @@ func updateState(
// Fire TxEvent for every tx.
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
eventBus.PublishEventNewBlock(types.EventDataNewBlock{
Block: block,
ResultBeginBlock: *abciResponses.BeginBlock,
kostko marked this conversation as resolved.
Show resolved Hide resolved
ResultEndBlock: *abciResponses.EndBlock,
})
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: block.Header,
ResultBeginBlock: *abciResponses.BeginBlock,
ResultEndBlock: *abciResponses.EndBlock,
})

for i, tx := range block.Data.Txs {
eventBus.PublishEventTx(types.EventDataTx{types.TxResult{
Expand Down
5 changes: 3 additions & 2 deletions state/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func saveState(db dbm.DB, state State, key []byte) {
// of the various ABCI calls during block processing.
// It is persisted to disk for each height before calling Commit.
type ABCIResponses struct {
DeliverTx []*abci.ResponseDeliverTx
EndBlock *abci.ResponseEndBlock
DeliverTx []*abci.ResponseDeliverTx
EndBlock *abci.ResponseEndBlock
BeginBlock *abci.ResponseBeginBlock
}

// NewABCIResponses returns a new ABCIResponses
Expand Down
2 changes: 1 addition & 1 deletion tools/tm-monitor/monitor/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestNodeNewBlockReceived(t *testing.T) {
n.SendBlocksTo(blockCh)

blockHeader := tmtypes.Header{Height: 5}
emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{blockHeader})
emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{Header: blockHeader})

assert.Equal(t, int64(5), n.Height)
assert.Equal(t, blockHeader, <-blockCh)
Expand Down
52 changes: 39 additions & 13 deletions types/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,48 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
return nil
}

func (b *EventBus) validateAndStringifyTags(tags []cmn.KVPair, logger log.Logger) map[string]string {
result := make(map[string]string)
for _, tag := range tags {
// basic validation
if len(tag.Key) == 0 {
logger.Debug("Got tag with an empty key (skipping)", "tag", tag)
continue
}
result[string(tag.Key)] = string(tag.Value)
}
return result
}

func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it maybe be better to have independent subscription events for BeginBlock and EndBlock, rather than stuffing them in with NewBlock/NewBlockHeader like this?

return b.Publish(EventNewBlock, data)
// no explicit deadline for publishing events
ctx := context.Background()

resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("block", data.Block.StringShort()))

// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlock

b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
return nil
}

func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
return b.Publish(EventNewBlockHeader, data)
// no explicit deadline for publishing events
ctx := context.Background()

resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
// TODO: Create StringShort method for Header and use it in logger.
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("header", data.Header))
kostko marked this conversation as resolved.
Show resolved Hide resolved

// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlockHeader

b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
return nil
}

func (b *EventBus) PublishEventVote(data EventDataVote) error {
Expand All @@ -94,17 +130,7 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
// no explicit deadline for publishing events
ctx := context.Background()

tags := make(map[string]string)

// validate and fill tags from tx result
for _, tag := range data.Result.Tags {
// basic validation
if len(tag.Key) == 0 {
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", data.Tx)
continue
}
tags[string(tag.Key)] = string(tag.Value)
}
tags := b.validateAndStringifyTags(data.Result.Tags, b.Logger.With("tx", data.Tx))

// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
Expand Down
84 changes: 84 additions & 0 deletions types/event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,90 @@ func TestEventBusPublishEventTx(t *testing.T) {
}
}

func TestEventBusPublishEventNewBlock(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()

block := MakeBlock(0, []Tx{}, nil, []Evidence{})
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}

txEventsCh := make(chan interface{})

// PublishEventNewBlock adds the tm.event tag, so the query below should work
query := "tm.event='NewBlock' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
require.NoError(t, err)

done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}
}()

err = eventBus.PublishEventNewBlock(EventDataNewBlock{
Block: block,
ResultBeginBlock: resultBeginBlock,
ResultEndBlock: resultEndBlock,
})
assert.NoError(t, err)

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a block after 1 sec.")
}
}

func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()

block := MakeBlock(0, []Tx{}, nil, []Evidence{})
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}

txEventsCh := make(chan interface{})

// PublishEventNewBlockHeader adds the tm.event tag, so the query below should work
query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
require.NoError(t, err)

done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlockHeader)
assert.Equal(t, block.Header, edt.Header)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}
}()

err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{
Header: block.Header,
ResultBeginBlock: resultBeginBlock,
ResultEndBlock: resultEndBlock,
})
assert.NoError(t, err)

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a block header after 1 sec.")
}
}

func TestEventBusPublish(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
Expand Down
7 changes: 7 additions & 0 deletions types/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

amino "github.com/tendermint/go-amino"
abci "github.com/tendermint/tendermint/abci/types"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
)
Expand Down Expand Up @@ -56,11 +57,17 @@ func RegisterEventDatas(cdc *amino.Codec) {

type EventDataNewBlock struct {
Block *Block `json:"block"`

ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we treat these like we treat transactions, as independent events, with EventDataBeginBlock and EventDataEndBlock?

What are the use cases for getting just this data - is it more useful with the block itself or with other results tags from the block?

ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
}

// light weight event for benchmarking
type EventDataNewBlockHeader struct {
Header Header `json:"header"`

ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"`
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
}

// All txs fire EventDataTx
Expand Down