diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index 8c59ed0..d6262f2 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -107,7 +107,8 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { // & queued pool, so that when new tx gets added into pending pool // queued pool also gets notified & gets to update state if required alreadyInPendingPoolChan := make(chan *data.MemPoolTx, 4096) - lastSeenBlockChan := make(chan uint64, 1) + inPendingPoolChan := make(chan *data.MemPoolTx, 4096) + lastSeenBlockChan := make(chan uint64, 16) // initialising pending pool pendingPool := &data.PendingPool{ @@ -124,6 +125,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { AddFromQueuedPoolChan: make(chan data.AddRequest, 1), RemoveTxChan: make(chan data.RemoveRequest, 1), AlreadyInPendingPoolChan: alreadyInPendingPoolChan, + InPendingPoolChan: inPendingPoolChan, TxExistsChan: make(chan data.ExistsRequest, 1), GetTxChan: make(chan data.GetRequest, 1), CountTxsChan: make(chan data.CountRequest, 1), @@ -164,6 +166,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { // Block head listener & pending pool pruner // talks over this buffered channel caughtTxsChan := make(chan listen.CaughtTxs, 16) + notFoundTxsChan := make(chan listen.CaughtTxs, 16) confirmedTxsChan := make(chan data.ConfirmedTx, 4096) // Starting pool life cycle manager go routine @@ -172,13 +175,14 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { // // After that this pool will also let (b) know that it can // update state of txs, which have become unstuck - go pool.Pending.Prune(ctx, caughtTxsChan, confirmedTxsChan) + go pool.Pending.Prune(ctx, caughtTxsChan, confirmedTxsChan, notFoundTxsChan) go pool.Queued.Start(ctx) // (b) go pool.Queued.Prune(ctx, confirmedTxsChan, alreadyInPendingPoolChan) // Listens for new block headers & informs 👆 (a) for pruning // txs which can be/ need to be go listen.SubscribeHead(ctx, wsClient, caughtTxsChan, lastSeenBlockChan) + go data.TrackNotFoundTxs(ctx, inPendingPoolChan, notFoundTxsChan, caughtTxsChan) // Passed this mempool handle to graphql query resolver if err := graph.InitMemPool(pool); err != nil { diff --git a/app/data/not_found_txs.go b/app/data/not_found_txs.go new file mode 100644 index 0000000..02994ef --- /dev/null +++ b/app/data/not_found_txs.go @@ -0,0 +1,52 @@ +package data + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/itzmeanjan/harmony/app/listen" +) + +// TrackNotFoundTxs - Those tx(s) which are found to be mined before actual pending tx is added +// into mempool, are kept track of here & pending pool pruner is informed about it, when it's found +// to be joining mempool, in some time future. +func TrackNotFoundTxs(ctx context.Context, inPendingPoolChan <-chan *MemPoolTx, notFoundTxsChan <-chan listen.CaughtTxs, alreadyMinedTxChan chan<- listen.CaughtTxs) { + + // Yes, for faster lookup, it's kept this + keeper := make(map[common.Hash]*listen.CaughtTx) + + for { + select { + case <-ctx.Done(): + return + + case tx := <-inPendingPoolChan: + // Just learnt about new tx which is considered to be pending + // & added into pool, but this tx is mined in some previous block + // which we've kept track of here in `keeper` structure + // + // We're letting pending pool pruner know, it must act in declaring this tx + // to be mined + + if kept, ok := keeper[tx.Hash]; ok { + alreadyMinedTxChan <- []*listen.CaughtTx{kept} + delete(keeper, tx.Hash) + } + + case txs := <-notFoundTxsChan: + // New block was mined with some txs + // which were not found in pending pool then, + // which are being kept track of here + // + // To be used for letting pending pool know + // some newly added pending pool tx is actually mined + // in some later date + + for i := 0; i < len(txs); i++ { + keeper[txs[i].Hash] = txs[i] + } + + } + } + +} diff --git a/app/data/pending.go b/app/data/pending.go index c36cdaf..7219c87 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -31,6 +31,7 @@ type PendingPool struct { AddFromQueuedPoolChan chan AddRequest RemoveTxChan chan RemoveRequest AlreadyInPendingPoolChan chan *MemPoolTx + InPendingPoolChan chan<- *MemPoolTx TxExistsChan chan ExistsRequest GetTxChan chan GetRequest CountTxsChan chan CountRequest @@ -218,11 +219,13 @@ func (p *PendingPool) Start(ctx context.Context) { // in pending pool, so it can be removed from queued pool // if it's living there too p.AlreadyInPendingPoolChan <- req.Tx + p.InPendingPoolChan <- req.Tx } case req := <-p.AddFromQueuedPoolChan: req.ResponseChan <- txAdder(req.Tx) + p.InPendingPoolChan <- req.Tx case req := <-p.RemoveTxChan: @@ -321,7 +324,7 @@ func (p *PendingPool) Start(ctx context.Context) { // Only keep moving forward if p.LastSeenBlock > num { - continue + break } p.LastSeenBlock = num @@ -340,7 +343,7 @@ func (p *PendingPool) Start(ctx context.Context) { // Prune - Remove confirmed/ dropped txs from pending pool // // @note This method is supposed to be run as independent go routine -func (p *PendingPool) Prune(ctx context.Context, caughtTxsChan <-chan listen.CaughtTxs, confirmedTxsChan chan ConfirmedTx) { +func (p *PendingPool) Prune(ctx context.Context, caughtTxsChan <-chan listen.CaughtTxs, confirmedTxsChan chan<- ConfirmedTx, notFoundTxsChan chan<- listen.CaughtTxs) { // Creating worker pool, where jobs to be submitted // for concurrently checking whether tx was dropped or not @@ -360,6 +363,7 @@ func (p *PendingPool) Prune(ctx context.Context, caughtTxsChan <-chan listen.Cau case txs := <-caughtTxsChan: var prunables []*MemPoolTx = make([]*MemPoolTx, 0, len(txs)) + var notFoundTxs []*listen.CaughtTx = make([]*listen.CaughtTx, 0, len(txs)) // How & which prunable tx(s) are kept in linear memory slot `prunables` // i.e. starting from where & how many of those @@ -382,7 +386,10 @@ func (p *PendingPool) Prune(ctx context.Context, caughtTxsChan <-chan listen.Cau tx := p.Get(txs[i].Hash) if tx == nil { - // well, couldn't find tx in pool + // well, couldn't find tx in pool, keeping track of + // it in another worker, which will let us know about it + // when need to + notFoundTxs = append(notFoundTxs, txs[i]) continue } @@ -418,6 +425,13 @@ func (p *PendingPool) Prune(ctx context.Context, caughtTxsChan <-chan listen.Cau } + // In current iteration, if we've found some mined txs + // not to be present in mempool, we're keeping track of it + // in different worker & let us know about it in future date + if len(notFoundTxs) != 0 { + notFoundTxsChan <- notFoundTxs + } + // Letting queued pool pruning worker know txs from // these addresses with this nonce got mined in this block for addr := range alreadyAddedFromA { diff --git a/app/listen/header.go b/app/listen/header.go index 3c46930..13d5ddb 100644 --- a/app/listen/header.go +++ b/app/listen/header.go @@ -25,7 +25,7 @@ type CaughtTxs []*CaughtTx // SubscribeHead - Subscribe to block headers & as soon as new block gets mined // its txs are picked up & published on a go channel, which will be listened // to by pending pool watcher, so that it can prune its state -func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan CaughtTxs, lastSeenBlockChan chan uint64) { +func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan<- CaughtTxs, lastSeenBlockChan chan<- uint64) { retryTable := make(map[*big.Int]bool) headerChan := make(chan *types.Header, 64) @@ -93,7 +93,7 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan } // ProcessBlock - Fetches all txs present in mined block & passes those to pending pool pruning worker -func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int, commChan chan CaughtTxs, lastSeenBlockChan chan uint64) bool { +func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int, commChan chan<- CaughtTxs, lastSeenBlockChan chan<- uint64) bool { block, err := client.BlockByNumber(ctx, number) if err != nil {