Skip to content

Commit

Permalink
Improved HTTPSubscribers.
Browse files Browse the repository at this point in the history
  • Loading branch information
rakshasa committed Feb 18, 2023
1 parent f58f21e commit 1af3994
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 55 deletions.
6 changes: 4 additions & 2 deletions ethhelpers/client_with_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ type clientWithHandlers struct {

// NewClientWithHandlers creates a new client with custom handlers.
//
// The handlers cannot modify the content of the arguments or results, except by
// overriding the error.
// The handlers cannot modify the content of the arguments or results, except
// for overriding the error returned.
//
// Handlers should return nil if it has not changed the error.
func NewClientWithDefaultHandler(defaultHandler func(context.Context, ClientCaller) error) Client {
return &clientWithHandlers{
defaultHandler: defaultHandler,
Expand Down
14 changes: 10 additions & 4 deletions ethhelpers/client_with_http_subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ import (
)

const (
// Deprecated:
maxFilterLogWindow = 1000
)

type clientWithHTTPSubscriptions struct {
Client
createTicker func(context.Context, uint64) BlockNumberTicker
createTicker func(context.Context, uint64) (BlockNumberTicker, error)
}

// TODO: Add options.

func NewClientWithHTTPSubscriptions(client Client, createTicker func(context.Context, uint64) BlockNumberTicker) Client {
func NewClientWithHTTPSubscriptions(client Client, createTicker func(context.Context, uint64) (BlockNumberTicker, error)) Client {
return &clientWithHTTPSubscriptions{
Client: client,
createTicker: createTicker,
Expand All @@ -27,6 +28,11 @@ func NewClientWithHTTPSubscriptions(client Client, createTicker func(context.Con

// The context argument cancels the RPC request that sets up the subscription
// but has no effect on the subscription after Subscribe has returned.
func (c *clientWithHTTPSubscriptions) SubscribeFilterLogs(ctx context.Context, filterQuery ethereum.FilterQuery, logChan chan<- types.Log) (ethereum.Subscription, error) {
return SubscribeFilterLogsWithHTTP(ctx, c.Client, c.createTicker, filterQuery, logChan)
func (c *clientWithHTTPSubscriptions) SubscribeFilterLogs(ctx context.Context, filterQuery ethereum.FilterQuery, logs chan<- types.Log) (ethereum.Subscription, error) {
return SubscribeFilterLogsWithHTTP(ctx, &HTTPSubscriberOptions{
Client: c.Client,
CreateTicker: c.createTicker,
FilterQuery: filterQuery,
Logs: logs,
})
}
4 changes: 2 additions & 2 deletions ethhelpers/client_with_http_subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func TestClientWithHTTPSubscriptions_SubscribeFilterLogs(t *testing.T) {
client := ethhelpers.NewClientWithHTTPSubscriptions(
ethtesting.NewSimulatedClient(sim.Backend),

func(ctx context.Context, fromBlock uint64) ethhelpers.BlockNumberTicker {
return ethhelpers.NewPeriodicBlockNumberTickerFromBlock(ctx, ethtesting.NewSimulatedClient(sim.Backend), time.Second/4, fromBlock)
func(ctx context.Context, fromBlock uint64) (ethhelpers.BlockNumberTicker, error) {
return ethhelpers.NewPeriodicBlockNumberTickerFromBlock(ctx, ethtesting.NewSimulatedClient(sim.Backend), time.Second/4, fromBlock), nil
},
)

Expand Down
8 changes: 4 additions & 4 deletions ethhelpers/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ type Client interface {

// TODO: Add client without subcription methods.

type BlockNumberReader interface {
BlockNumber(ctx context.Context) (uint64, error)
}

type ChainReaderAndTransactionSender interface {
ethereum.TransactionSender
ethereum.ChainReader
}

type BlockNumberReader interface {
BlockNumber(ctx context.Context) (uint64, error)
}

type FilterLogsReader interface {
FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
}
156 changes: 120 additions & 36 deletions ethhelpers/http_subscribers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,69 +10,149 @@ import (
)

type httpSubscription struct {
subCancel func()
errChan chan error
unsubDone chan struct{}
cancel func()
err chan error
done chan struct{}
}

func (s *httpSubscription) Unsubscribe() {
s.subCancel()
<-s.unsubDone
s.cancel()
<-s.done

close(s.errChan)
close(s.err)
}

func (s *httpSubscription) Err() <-chan error {
return s.errChan
return s.err
}

type HTTPSubscriberClient interface {
BlockNumberReader
FilterLogsReader
}

type HTTPSubscriberOptions struct {
Client HTTPSubscriberClient

// CreateContext returns a context that is used for the subscription, or
// context.Background() if nil.
CreateContext func() (context.Context, context.CancelFunc)

// CreateTicker is a function that creates a block number ticker.
//
// The context is canceled when a subscription is unsubscribed or encounters
// an error, or if the context passed to the function creating the
// HTTPSubscriber function is canceled before returning a valid
// subscription.
//
// The method is called only once per subscription.
CreateTicker func(ctx context.Context, fromBlock uint64) (BlockNumberTicker, error)

FilterQuery ethereum.FilterQuery
Logs chan<- types.Log
}

// The context argument cancels the RPC request that sets up the subscription
// but has no effect on the subscription after Subscribe has returned.
//
// Subscribers should be using the same underlying go-ethereum rpc client as the
// block number ticker to ensure there are no race-conditions.
// Subscribers should be using the same underlying go-ethereum rpc client
// connection as the block number ticker to ensure there are no race-conditions.
//
// To conform to the SubscriberFilterLogs api the ticker should only return
// current, and not historic, block numbers.
func SubscribeFilterLogsWithHTTP(ctx context.Context, client FilterLogsReader, createTicker func(context.Context, uint64) BlockNumberTicker, filterQuery ethereum.FilterQuery, logChan chan<- types.Log) (ethereum.Subscription, error) {
if logChan == nil {
panic("channel given to SubscribeFilterLogs must not be nil")
// To conform to the go-ethereum SubscriberFilterLogs api the ticker should only
// return current, and not historic, block numbers.
//
// The current block number is requested before the subscription is returned.
func SubscribeFilterLogsWithHTTP(callerCtx context.Context, opts *HTTPSubscriberOptions) (ethereum.Subscription, error) {
if opts.Client == nil {
return nil, fmt.Errorf("opts.Client must be set")
}
if opts.CreateTicker == nil {
return nil, fmt.Errorf("opts.CreateTicker must be set")
}
if opts.Logs == nil {
return nil, fmt.Errorf("opts.Logs must be set")
}

subscriberCtx, cancel := func() (context.Context, context.CancelFunc) {
if opts.CreateContext == nil {
return context.WithCancel(context.Background())
}

subCtx, subCancel := context.WithCancel(context.Background())
return opts.CreateContext()
}()

s := &httpSubscription{
subCancel: subCancel,
errChan: make(chan error, 1),
unsubDone: make(chan struct{}),
}
ticker, err := func(ctx context.Context, done <-chan struct{}) (BlockNumberTicker, error) {
ch := make(chan struct{})
defer close(ch)

go func() {
select {
case <-ch:
case <-done:
cancel()
}
}()

currentBlock, err := opts.Client.BlockNumber(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get current block number: %w", err)
}

queryFromBlock, ok := BigIntAsUint64OrZeroIfNil(filterQuery.FromBlock)
if !ok {
return nil, fmt.Errorf("invalid FromBlock value")
if opts.FilterQuery.FromBlock == nil {
return opts.CreateTicker(ctx, currentBlock)
}
if !opts.FilterQuery.FromBlock.IsUint64() {
return nil, fmt.Errorf("opts.FilterQuery.FromBlock is too large")
}

queryFromBlock := opts.FilterQuery.FromBlock.Uint64()

if currentBlock < queryFromBlock {
return opts.CreateTicker(ctx, queryFromBlock)
} else {
return opts.CreateTicker(ctx, currentBlock)
}

}(subscriberCtx, callerCtx.Done())
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create block ticker: %w", err)
}

ticker := createTicker(ctx, queryFromBlock)
s := &httpSubscription{
cancel: cancel,
err: make(chan error, 1),
done: make(chan struct{}),
}

go func(ctx context.Context) {
defer close(s.unsubDone)
defer close(s.done)

waitFn := func() (uint64, bool) {
select {
case bn := <-ticker.Wait():
case bn, ok := <-ticker.Wait():
if !ok {
s.err <- fmt.Errorf("block ticker wait channel closed")
return 0, false
}

return bn.BlockNumber, true

case err := <-ticker.Err():
case err, ok := <-ticker.Err():
if !ok {
s.err <- fmt.Errorf("block number ticker closed the error channel")
return 0, false
}
if err == nil {
s.errChan <- fmt.Errorf("block number ticker returned nil error")
s.err <- fmt.Errorf("block number ticker returned a nil error")
return 0, false
}

s.errChan <- err
s.err <- err
return 0, false

case <-ctx.Done():
s.errChan <- ctx.Err()
s.err <- ctx.Err()
return 0, false
}
}
Expand All @@ -87,29 +167,33 @@ func SubscribeFilterLogsWithHTTP(ctx context.Context, client FilterLogsReader, c
if !ok {
return
}
if currentBlock < fromBlock {
s.err <- fmt.Errorf("block number ticker returned a block number less than the from block")
return
}

q := filterQuery
q := opts.FilterQuery
q.FromBlock = new(big.Int).SetUint64(fromBlock)
q.ToBlock = new(big.Int).SetUint64(currentBlock)

logs, err := client.FilterLogs(ctx, q)
logs, err := opts.Client.FilterLogs(ctx, q)
if err != nil {
s.errChan <- err
s.err <- err
return
}

for _, log := range logs {
select {
case logChan <- log:
case opts.Logs <- log:
case <-ctx.Done():
s.errChan <- err
s.err <- err
return
}
}

fromBlock = currentBlock + 1
}
}(subCtx)
}(subscriberCtx)

return s, nil
}
7 changes: 0 additions & 7 deletions ethhelpers/tickers.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,6 @@ func (t *blockNumberTicker) Wait() <-chan BlockNumber {
case <-t.request:
default:
}

// select {
// case t.request <- blockNumberTickerRequest{ch}:
// return ch
// case <-t.request:
// // Discard previous request if it hasn't been read yet.
// }
}
}

Expand Down

0 comments on commit 1af3994

Please sign in to comment.