Skip to content

Commit

Permalink
pubsub 2.0 (#3227)
Browse files Browse the repository at this point in the history
* green pubsub tests :OK:

* get rid of clientToQueryMap

* Subscribe and SubscribeUnbuffered

* start adapting other pkgs to new pubsub

* nope

* rename MsgAndTags to Message

* remove TagMap

it does not bring any additional benefits

* bring back EventSubscriber

* fix test

* fix data race in TestStartNextHeightCorrectly

```
Write at 0x00c0001c7418 by goroutine 796:
  github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly()
      /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162

Previous read at 0x00c0001c7418 by goroutine 858:
  github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote()
      /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366
  github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote()
      /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f
  github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg()
      /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e
  github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine()
      /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794

Goroutine 796 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:878 +0x659
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1119 +0xa8
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1117 +0x4ee
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1034 +0x2ee
  main.main()
      _testmain.go:214 +0x332

Goroutine 858 (running) created at:
  github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines()
      /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221
  github.com/tendermint/tendermint/consensus.startTestRound()
      /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63
  github.com/tendermint/tendermint/consensus.TestStateFullRound1()
      /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162
```

* fixes after my own review

* fix formatting

* wait 100ms before kicking a subscriber out

+ a test for indexer_service

* fixes after my second review

* no timeout

* add changelog entries

* fix merge conflicts

* fix typos after Thane's review

Co-Authored-By: melekes <anton.kalyaev@gmail.com>

* reformat code

* rewrite indexer service in the attempt to fix failing test

tendermint/tendermint#3227

* Revert "rewrite indexer service in the attempt to fix failing test"

This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1.

* another attempt to fix indexer

* fixes after Ethan's review

* use unbuffered channel when indexing transactions

Refs tendermint/tendermint#3227 (comment)

* add a comment for EventBus#SubscribeUnbuffered

* format code
  • Loading branch information
melekes authored and ebuchman committed Feb 23, 2019
1 parent a30395e commit 387e969
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 236 deletions.
7 changes: 3 additions & 4 deletions pubsub/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ func TestExample(t *testing.T) {
defer s.Stop()

ctx := context.Background()
ch := make(chan interface{}, 1)
err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch)
subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"))
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]string{"abci.account.name": "John"}))
err = s.PublishWithTags(ctx, "Tombstone", map[string]string{"abci.account.name": "John"})
require.NoError(t, err)
assertReceive(t, "Tombstone", ch)
assertReceive(t, "Tombstone", subscription.Out())
}

0 comments on commit 387e969

Please sign in to comment.