Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #14 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Track slowly propagated tx(s)
  • Loading branch information
itzmeanjan committed Apr 19, 2021
2 parents cc02779 + 31cc490 commit 7fe7ba3
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 7 deletions.
8 changes: 6 additions & 2 deletions app/bootup/bootup.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
// & queued pool, so that when new tx gets added into pending pool
// queued pool also gets notified & gets to update state if required
alreadyInPendingPoolChan := make(chan *data.MemPoolTx, 4096)
lastSeenBlockChan := make(chan uint64, 1)
inPendingPoolChan := make(chan *data.MemPoolTx, 4096)
lastSeenBlockChan := make(chan uint64, 16)

// initialising pending pool
pendingPool := &data.PendingPool{
Expand All @@ -124,6 +125,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
AddFromQueuedPoolChan: make(chan data.AddRequest, 1),
RemoveTxChan: make(chan data.RemoveRequest, 1),
AlreadyInPendingPoolChan: alreadyInPendingPoolChan,
InPendingPoolChan: inPendingPoolChan,
TxExistsChan: make(chan data.ExistsRequest, 1),
GetTxChan: make(chan data.GetRequest, 1),
CountTxsChan: make(chan data.CountRequest, 1),
Expand Down Expand Up @@ -164,6 +166,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
// Block head listener & pending pool pruner
// talks over this buffered channel
caughtTxsChan := make(chan listen.CaughtTxs, 16)
notFoundTxsChan := make(chan listen.CaughtTxs, 16)
confirmedTxsChan := make(chan data.ConfirmedTx, 4096)

// Starting pool life cycle manager go routine
Expand All @@ -172,13 +175,14 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
//
// After that this pool will also let (b) know that it can
// update state of txs, which have become unstuck
go pool.Pending.Prune(ctx, caughtTxsChan, confirmedTxsChan)
go pool.Pending.Prune(ctx, caughtTxsChan, confirmedTxsChan, notFoundTxsChan)
go pool.Queued.Start(ctx)
// (b)
go pool.Queued.Prune(ctx, confirmedTxsChan, alreadyInPendingPoolChan)
// Listens for new block headers & informs 👆 (a) for pruning
// txs which can be/ need to be
go listen.SubscribeHead(ctx, wsClient, caughtTxsChan, lastSeenBlockChan)
go data.TrackNotFoundTxs(ctx, inPendingPoolChan, notFoundTxsChan, caughtTxsChan)

// Passed this mempool handle to graphql query resolver
if err := graph.InitMemPool(pool); err != nil {
Expand Down
52 changes: 52 additions & 0 deletions app/data/not_found_txs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package data

import (
"context"

"github.com/ethereum/go-ethereum/common"
"github.com/itzmeanjan/harmony/app/listen"
)

// TrackNotFoundTxs - Those tx(s) which are found to be mined before actual pending tx is added
// into mempool, are kept track of here & pending pool pruner is informed about it, when it's found
// to be joining mempool, in some time future.
func TrackNotFoundTxs(ctx context.Context, inPendingPoolChan <-chan *MemPoolTx, notFoundTxsChan <-chan listen.CaughtTxs, alreadyMinedTxChan chan<- listen.CaughtTxs) {

// Yes, for faster lookup, it's kept this
keeper := make(map[common.Hash]*listen.CaughtTx)

for {
select {
case <-ctx.Done():
return

case tx := <-inPendingPoolChan:
// Just learnt about new tx which is considered to be pending
// & added into pool, but this tx is mined in some previous block
// which we've kept track of here in `keeper` structure
//
// We're letting pending pool pruner know, it must act in declaring this tx
// to be mined

if kept, ok := keeper[tx.Hash]; ok {
alreadyMinedTxChan <- []*listen.CaughtTx{kept}
delete(keeper, tx.Hash)
}

case txs := <-notFoundTxsChan:
// New block was mined with some txs
// which were not found in pending pool then,
// which are being kept track of here
//
// To be used for letting pending pool know
// some newly added pending pool tx is actually mined
// in some later date

for i := 0; i < len(txs); i++ {
keeper[txs[i].Hash] = txs[i]
}

}
}

}
20 changes: 17 additions & 3 deletions app/data/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type PendingPool struct {
AddFromQueuedPoolChan chan AddRequest
RemoveTxChan chan RemoveRequest
AlreadyInPendingPoolChan chan *MemPoolTx
InPendingPoolChan chan<- *MemPoolTx
TxExistsChan chan ExistsRequest
GetTxChan chan GetRequest
CountTxsChan chan CountRequest
Expand Down Expand Up @@ -218,11 +219,13 @@ func (p *PendingPool) Start(ctx context.Context) {
// in pending pool, so it can be removed from queued pool
// if it's living there too
p.AlreadyInPendingPoolChan <- req.Tx
p.InPendingPoolChan <- req.Tx
}

case req := <-p.AddFromQueuedPoolChan:

req.ResponseChan <- txAdder(req.Tx)
p.InPendingPoolChan <- req.Tx

case req := <-p.RemoveTxChan:

Expand Down Expand Up @@ -321,7 +324,7 @@ func (p *PendingPool) Start(ctx context.Context) {

// Only keep moving forward
if p.LastSeenBlock > num {
continue
break
}

p.LastSeenBlock = num
Expand All @@ -340,7 +343,7 @@ func (p *PendingPool) Start(ctx context.Context) {
// Prune - Remove confirmed/ dropped txs from pending pool
//
// @note This method is supposed to be run as independent go routine
func (p *PendingPool) Prune(ctx context.Context, caughtTxsChan <-chan listen.CaughtTxs, confirmedTxsChan chan ConfirmedTx) {
func (p *PendingPool) Prune(ctx context.Context, caughtTxsChan <-chan listen.CaughtTxs, confirmedTxsChan chan<- ConfirmedTx, notFoundTxsChan chan<- listen.CaughtTxs) {

// Creating worker pool, where jobs to be submitted
// for concurrently checking whether tx was dropped or not
Expand All @@ -360,6 +363,7 @@ func (p *PendingPool) Prune(ctx context.Context, caughtTxsChan <-chan listen.Cau
case txs := <-caughtTxsChan:

var prunables []*MemPoolTx = make([]*MemPoolTx, 0, len(txs))
var notFoundTxs []*listen.CaughtTx = make([]*listen.CaughtTx, 0, len(txs))

// How & which prunable tx(s) are kept in linear memory slot `prunables`
// i.e. starting from where & how many of those
Expand All @@ -382,7 +386,10 @@ func (p *PendingPool) Prune(ctx context.Context, caughtTxsChan <-chan listen.Cau

tx := p.Get(txs[i].Hash)
if tx == nil {
// well, couldn't find tx in pool
// well, couldn't find tx in pool, keeping track of
// it in another worker, which will let us know about it
// when need to
notFoundTxs = append(notFoundTxs, txs[i])
continue
}

Expand Down Expand Up @@ -418,6 +425,13 @@ func (p *PendingPool) Prune(ctx context.Context, caughtTxsChan <-chan listen.Cau

}

// In current iteration, if we've found some mined txs
// not to be present in mempool, we're keeping track of it
// in different worker & let us know about it in future date
if len(notFoundTxs) != 0 {
notFoundTxsChan <- notFoundTxs
}

// Letting queued pool pruning worker know txs from
// these addresses with this nonce got mined in this block
for addr := range alreadyAddedFromA {
Expand Down
4 changes: 2 additions & 2 deletions app/listen/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type CaughtTxs []*CaughtTx
// SubscribeHead - Subscribe to block headers & as soon as new block gets mined
// its txs are picked up & published on a go channel, which will be listened
// to by pending pool watcher, so that it can prune its state
func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan CaughtTxs, lastSeenBlockChan chan uint64) {
func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan<- CaughtTxs, lastSeenBlockChan chan<- uint64) {

retryTable := make(map[*big.Int]bool)
headerChan := make(chan *types.Header, 64)
Expand Down Expand Up @@ -93,7 +93,7 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan
}

// ProcessBlock - Fetches all txs present in mined block & passes those to pending pool pruning worker
func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int, commChan chan CaughtTxs, lastSeenBlockChan chan uint64) bool {
func ProcessBlock(ctx context.Context, client *ethclient.Client, number *big.Int, commChan chan<- CaughtTxs, lastSeenBlockChan chan<- uint64) bool {

block, err := client.BlockByNumber(ctx, number)
if err != nil {
Expand Down

0 comments on commit 7fe7ba3

Please sign in to comment.