Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds ADR 038 - go plugin system #404

Merged
merged 73 commits into from Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
648ae0c
Revert "Merge pull request #270 from provenance-io/llama/add-fee-deno…
SpicyLemon Nov 15, 2022
3653851
Bring in v0.46.4 changes. (#362)
SpicyLemon Nov 15, 2022
54ed551
Mark v0.46.4-pio-1 in the changelog and release notes.
SpicyLemon Nov 15, 2022
2f4eb3a
Bring in v0.46.5 changes. (#365)
SpicyLemon Nov 17, 2022
7cd3d08
Bring in changes from Cosmos-SDK v0.46.6 (#367)
SpicyLemon Nov 21, 2022
ac8a6b8
Backport to v0.46.x: feat: Add x/authz SendAuthorization AllowList #1…
SpicyLemon Nov 21, 2022
6e3f814
Mark v0.46.6-pio-1 in the changelog and release notes.
SpicyLemon Nov 21, 2022
3b53797
use hashicorp go-plugin
Sep 23, 2022
9264d78
python example
Sep 23, 2022
295cefb
implement streaming to baseapp
Sep 27, 2022
5eda8e0
abci listener wiring
Sep 27, 2022
4786f05
update test files
Sep 28, 2022
9843416
update test files
Sep 28, 2022
2fa745b
kafka plugin python example
Sep 28, 2022
2f6d851
update package name
Sep 29, 2022
8c4c9bc
update proto and services
Sep 30, 2022
cc93f14
tx and kvpair indexing
Oct 1, 2022
b147f19
tidy up
Oct 1, 2022
85f8fe8
add readme
Oct 6, 2022
9f14886
update readme
Oct 6, 2022
e4cfc03
add testing section
Oct 7, 2022
73ba2e3
listeners need to be async
Oct 11, 2022
07a6e87
remove goroutine on blocking call
Oct 13, 2022
e94c0e7
terminate with os.Exit on stopNodeOnErr
Oct 13, 2022
86599a2
dumb pipe bridge
Oct 23, 2022
3e8f2c1
fix: listen state writes on rootmulti store only
yihuang Oct 12, 2022
a40f5da
introduce memory listener and OnCommit listener
Oct 26, 2022
5e02a1e
remove WriteListener and use MemoryListener
Oct 28, 2022
3d0dfcd
internalize listener init to cache layer
Oct 28, 2022
ad0aa3e
determenistic PopStateCache, grpc service
Nov 16, 2022
57baef1
add Go file plugin
Nov 16, 2022
a0b7a9b
multiple service support
Nov 18, 2022
c25cd30
add \n
Nov 18, 2022
39c2d26
add proto dir
Nov 18, 2022
a9bcbbe
append to file
Nov 21, 2022
3f48c06
support in-process listeners
Nov 21, 2022
9f9472a
use setStreamingService
Dec 2, 2022
00e8ccb
enable plugins as managed clients
Dec 5, 2022
be60cbc
fix after cherry pick
Dec 5, 2022
22c294c
add default to toml config
Dec 5, 2022
0882a0a
format toml config
Dec 5, 2022
d38e28f
update template
Dec 5, 2022
ee27139
update template
Dec 5, 2022
4f20a3b
update documentation
Dec 6, 2022
b8e65ef
fix test
Dec 6, 2022
e16eaba
write request to single topics
Dec 6, 2022
5eb8736
fix lint errors
Dec 7, 2022
8e8ebb0
remove python examples, CI cannot verify
Dec 7, 2022
c3d2ee1
fix gosec findings
Dec 8, 2022
1a12f20
move plugin proto with sdk proto files, regen plugin code
Dec 9, 2022
5c1fd54
update readme
Dec 12, 2022
b9b519e
update deps
Dec 13, 2022
b493b8c
Bring in v0.46.7 updates. (#402)
SpicyLemon Dec 14, 2022
e9e0342
Merge branch 'release-pio/v0.46.x' into prov/egaxhaj/v0.46.6-pio-1-ad…
Dec 14, 2022
18b455c
Mark v0.46.7-pio-1 in the changelog and release notes.
SpicyLemon Dec 14, 2022
d2c56d3
Note v0.46.6-pio-3 in the changelog.
SpicyLemon Dec 21, 2022
982ad88
cherry-pick cosmos-sdk authz validatebasic fix (#429)
nullpointer0x00 Jan 12, 2023
dedfc7f
set res for delivertx
Jan 17, 2023
b387319
Revert "Revert "Merge pull request #270 from provenance-io/llama/add-…
SpicyLemon Jan 23, 2023
5064250
Merge branch 'main-pio' into release-pio/v0.46.x
SpicyLemon Jan 23, 2023
a67a38d
Merge branch 'main-pio' into release-pio/v0.46.x
SpicyLemon Jan 24, 2023
0b88c18
Merge branch 'main-pio' into release-pio/v0.46.x
SpicyLemon Jan 25, 2023
da2ea8a
Mark v0.46.8-pio-1 in the changelog and release notes.
SpicyLemon Jan 25, 2023
a7f217e
Merge branch 'release-pio/v0.46.x' into prov/egaxhaj/v0.46.6-pio-1-ad…
Jan 30, 2023
3f523d8
improvements from upstream feedback
Jan 30, 2023
65af340
fix tests
Jan 31, 2023
23cc47b
Merge branch 'main-pio' into prov/egaxhaj/v0.46.6-pio-1-add-adr038
Jan 31, 2023
db0557a
remove release notes
Jan 31, 2023
9777eb9
add changelog entry
Jan 31, 2023
e303240
changes based on feedback
Jan 31, 2023
7d4435f
fix tests
Feb 1, 2023
e247e29
default to empty keys
Feb 1, 2023
d1047bf
feadback fixes
Feb 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -45,6 +45,8 @@ vagrant
# IDE
.idea
*.iml
*.ipr
*.iws
.dir-locals.el
.vscode

Expand Down
4 changes: 3 additions & 1 deletion CHANGELOG.md
Expand Up @@ -39,7 +39,9 @@ Ref: https://keepachangelog.com/en/1.0.0/

## Unreleased

* nothing
### Features

* [#404](https://github.com/provenance-io/cosmos-sdk/pull/404) Add ADR-038 State Listening with Go Plugin System

---

Expand Down
6 changes: 6 additions & 0 deletions Makefile
Expand Up @@ -273,6 +273,12 @@ test-sim-nondeterminism:
@go test -mod=readonly $(SIMAPP) -run TestAppStateDeterminism -Enabled=true \
-NumBlocks=100 -BlockSize=200 -Commit=true -Period=0 -v -timeout 24h

test-sim-nondeterminism-streaming:
@echo "Running non-determinism-streaming test..."
@go test -mod=readonly $(SIMAPP) -run TestAppStateDeterminism -Enabled=true \
-NumBlocks=100 -BlockSize=200 -Commit=true -Period=0 -v -timeout 24h \
-EnableStreaming=true

test-sim-custom-genesis-fast:
@echo "Running custom genesis simulation..."
@echo "By default, ${HOME}/.gaiad/config/genesis.json will be used."
Expand Down
61 changes: 42 additions & 19 deletions baseapp/abci.go
Expand Up @@ -12,11 +12,13 @@ import (
"time"

"github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/hashicorp/go-plugin"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"

abci "github.com/tendermint/tendermint/abci/types"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"

"github.com/cosmos/cosmos-sdk/codec"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/telemetry"
Expand Down Expand Up @@ -186,10 +188,12 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
// set the signed validators for addition to context in deliverTx
app.voteInfos = req.LastCommitInfo.GetVotes()

// call the hooks with the BeginBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
panic(fmt.Errorf("BeginBlock listening hook failed, height: %d, err: %w", req.Header.Height, err))
// call the streaming service hook with the BeginBlock messages
for _, abciListener := range app.streamingManager.AbciListeners {
ctx := app.deliverState.ctx
blockHeight := ctx.BlockHeight()
if err := abciListener.ListenBeginBlock(ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", blockHeight, "err", err)
}
}

Expand All @@ -213,10 +217,12 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
res.ConsensusParamUpdates = cp
}

// call the streaming service hooks with the EndBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
panic(fmt.Errorf("EndBlock listening hook failed, height: %d, err: %w", req.Height, err))
// call the streaming service hook with the EndBlock messages
for _, abciListener := range app.streamingManager.AbciListeners {
ctx := app.deliverState.ctx
blockHeight := ctx.BlockHeight()
if err := abciListener.ListenEndBlock(ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", blockHeight, "err", err)
}
}

Expand Down Expand Up @@ -263,14 +269,18 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
// Otherwise, the ResponseDeliverTx will contain releveant error information.
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
// gas execution context.
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
gInfo := sdk.GasInfo{}
resultStr := "successful"

var res abci.ResponseDeliverTx
defer func() {
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
panic(fmt.Errorf("DeliverTx listening hook failed: %w", err))
// call the streaming service hook with the EndBlock messages
for _, abciListener := range app.streamingManager.AbciListeners {
ctx := app.deliverState.ctx
blockHeight := ctx.BlockHeight()
if err := abciListener.ListenDeliverTx(ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "height", blockHeight, "err", err)
}
}
}()
Expand All @@ -288,13 +298,15 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv
return sdkerrors.ResponseDeliverTxWithEvents(err, gInfo.GasWanted, gInfo.GasUsed, sdk.MarkEventsToIndex(anteEvents, app.indexEvents), app.trace)
}

return abci.ResponseDeliverTx{
res = abci.ResponseDeliverTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints?
Log: result.Log,
Data: result.Data,
Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents),
}

return res
}

// Commit implements the ABCI interface. It will commit all state that exists in
Expand All @@ -305,6 +317,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() abci.ResponseCommit {

header := app.deliverState.ctx.BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)

Expand All @@ -319,11 +332,18 @@ func (app *BaseApp) Commit() abci.ResponseCommit {
RetainHeight: retainHeight,
}

// call the hooks with the Commit message
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenCommit(app.deliverState.ctx, res); err != nil {
panic(fmt.Errorf("Commit listening hook failed, height: %d, err: %w", header.Height, err))
// call the streaming service hook with the EndBlock messages
abciListeners := app.streamingManager.AbciListeners
if len(abciListeners) > 0 {
ctx := app.deliverState.ctx
blockHeight := ctx.BlockHeight()
changeSet := app.cms.PopStateCache()
for _, abciListener := range abciListeners {
if err := abciListener.ListenCommit(ctx, res, changeSet); err != nil {
app.logger.Error("ListenCommit listening hook failed", "height", blockHeight, "err", err)
}
}
changeSet = nil
}

app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID))
Expand Down Expand Up @@ -376,6 +396,9 @@ func (app *BaseApp) halt() {
}
}

// Stop any running streaming plugins
plugin.CleanupClients()

// Resort to exiting immediately if the process could not be found or killed
// via SIGINT/SIGTERM signals.
app.logger.Info("failed to send SIGINT/SIGTERM; exiting...")
Expand Down
5 changes: 2 additions & 3 deletions baseapp/baseapp.go
Expand Up @@ -107,9 +107,8 @@ type BaseApp struct { // nolint: maligned
// which informs Tendermint what to index. If empty, all events will be indexed.
indexEvents map[string]struct{}

// abciListeners for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
abciListeners []ABCIListener
// streamingManager for managing instances and configuration of ABCIListener services
streamingManager storetypes.StreamingManager

feeHandler sdk.FeeHandler

Expand Down
79 changes: 79 additions & 0 deletions baseapp/baseapp_test.go
Expand Up @@ -2265,3 +2265,82 @@ func TestDefaultStoreLoader(t *testing.T) {
require.Equal(t, storetypes.StoreTypeIAVL, store.GetStoreType())
}
}

var (
distKey1 = storetypes.NewKVStoreKey("distKey1")
)

func TestABCI_MultiListener_StateChanges(t *testing.T) {
// increment the tx counter
anteKey := []byte("ante-key")
anteOpt := func(bapp *BaseApp) { bapp.SetAnteHandler(anteHandlerTxTest(t, capKey1, anteKey)) }

// increment the msg counter
deliverKey := []byte("deliver-key")
deliverKey2 := []byte("deliver-key2")
routerOpt := func(bapp *BaseApp) {
r1 := sdk.NewRoute(routeMsgCounter, handlerMsgCounter(t, capKey1, deliverKey))
r2 := sdk.NewRoute(routeMsgCounter2, handlerMsgCounter(t, capKey1, deliverKey2))
bapp.Router().AddRoute(r1)
bapp.Router().AddRoute(r2)
}

distOpt := func(bapp *BaseApp) { bapp.MountStores(distKey1) }
mockListener1 := NewMockABCIListener("lis_1")
mockListener2 := NewMockABCIListener("lis_2")
streamingManager := storetypes.StreamingManager{AbciListeners: []storetypes.ABCIListener{&mockListener1, &mockListener2}}
streamingManagerOpt := func(bapp *BaseApp) { bapp.SetStreamingManager(streamingManager) }
addListenerOpt := func(bapp *BaseApp) { bapp.CommitMultiStore().AddListeners([]storetypes.StoreKey{distKey1}) }

app := setupBaseApp(t, anteOpt, routerOpt, distOpt, streamingManagerOpt, addListenerOpt)
app.InitChain(abci.RequestInitChain{})

// Create same codec used in txDecoder
codec := codec.NewLegacyAmino()
registerTestCodec(codec)

nBlocks := 3
txPerHeight := 5

for blockN := 0; blockN < nBlocks; blockN++ {
header := tmproto.Header{Height: int64(blockN) + 1}
app.BeginBlock(abci.RequestBeginBlock{Header: header})
var expectedChangeSet []*storetypes.StoreKVPair

for i := 0; i < txPerHeight; i++ {
counter := int64(blockN*txPerHeight + i)
tx := newTxCounter(counter, counter)

txBytes, err := codec.Marshal(tx)
require.NoError(t, err)

sKey := []byte(fmt.Sprintf("distKey%d", i))
sVal := []byte(fmt.Sprintf("distVal%d", i))

ctx := app.getState(runTxModeDeliver).ctx
store := ctx.KVStore(distKey1)
store.Set(sKey, sVal)

expectedChangeSet = append(expectedChangeSet, &storetypes.StoreKVPair{
StoreKey: distKey1.Name(),
Delete: false,
Key: sKey,
Value: sVal,
})

res := app.DeliverTx(abci.RequestDeliverTx{Tx: txBytes})
require.True(t, res.IsOK(), fmt.Sprintf("%v", res))

events := res.GetEvents()
require.Len(t, events, 3, "should contain ante handler, message type and counter events respectively")
require.Equal(t, sdk.MarkEventsToIndex(counterEvent("ante_handler", counter).ToABCIEvents(), map[string]struct{}{})[0], events[0], "ante handler event")
require.Equal(t, sdk.MarkEventsToIndex(counterEvent(sdk.EventTypeMessage, counter).ToABCIEvents(), map[string]struct{}{})[0], events[2], "msg handler update counter event")
}

app.EndBlock(abci.RequestEndBlock{})
app.Commit()

require.Equal(t, expectedChangeSet, mockListener1.ChangeSet, "should contain the same changeSet")
require.Equal(t, expectedChangeSet, mockListener2.ChangeSet, "should contain the same changeSet")
}
}
17 changes: 6 additions & 11 deletions baseapp/options.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/store"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)

Expand Down Expand Up @@ -234,17 +235,6 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) {
app.msgServiceRouter.SetInterfaceRegistry(registry)
}

// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
// add the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
}
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, s)
}

// SetQueryMultiStore set a alternative MultiStore implementation to support grpc query service.
//
// Ref: https://github.com/cosmos/cosmos-sdk/issues/13317
Expand Down Expand Up @@ -272,3 +262,8 @@ func (app *BaseApp) SetAggregateEventsFunc(aggregateEventsFunc func(resultEvents

app.aggregateEventsFunc = aggregateEventsFunc
}

// SetStreamingManager sets the streaming manager for the BaseApp.test
func (app *BaseApp) SetStreamingManager(manager storetypes.StreamingManager) {
app.streamingManager = manager
}