Skip to content

Commit

Permalink
feat(indexer): Add a log cache
Browse files Browse the repository at this point in the history
This auguments the block cache update/eviction routines to maintain a
map of block number to logs vector, and services requests from the cache
iff the entire response is present in the cache.
  • Loading branch information
Yawning committed Mar 31, 2022
1 parent 58be80a commit f7fc08e
Showing 1 changed file with 47 additions and 2 deletions.
49 changes: 47 additions & 2 deletions indexer/backend_cache.go
Expand Up @@ -26,6 +26,7 @@ type cachingBackend struct {
blockByHashHex sync.Map
txByHashHex *ristretto.Cache
receiptByTxHashHex *ristretto.Cache
logsByBlockNumber sync.Map

lastIndexedRound uint64
lastRetainedRound uint64
Expand Down Expand Up @@ -58,6 +59,7 @@ func (cb *cachingBackend) OnBlockIndexed(
for i := range upsertedReceipts {
cb.cacheReceipt(upsertedReceipts[i])
}
cb.cacheLogsByBlockNumber(blk.Round, upsertedReceipts)

if blk.Round > atomic.LoadUint64(&cb.lastIndexedRound) {
atomic.StoreUint64(&cb.lastIndexedRound, blk.Round)
Expand Down Expand Up @@ -291,6 +293,27 @@ func (cb *cachingBackend) GetLogs(
startRound uint64,
endRound uint64,
) ([]*model.Log, error) {
const smallRangeMax = 16

// This uses BETWEEN, so inclusive on both ends. Additionally,
// scanning the map is relatively expensive, so limit the range
// we service from the cache to something "reasonable".
if reqSize := endRound - startRound + 1; reqSize <= smallRangeMax {
logs := make([]*model.Log, 0, reqSize)
var ok bool
for i := startRound; i <= endRound; i++ {
var blockLogs []*model.Log
blockLogs, ok = cb.cachedLogsByNumber(i)
if !ok {
break
}
logs = append(logs, blockLogs...)
}
if ok {
return logs, nil
}
}

return cb.inner.GetLogs(ctx, startRound, endRound)
}

Expand Down Expand Up @@ -318,6 +341,17 @@ func (cb *cachingBackend) cacheReceipt(
cb.receiptByTxHashHex.Set(receipt.TransactionHash, receipt, int64(receipt.Size()))
}

func (cb *cachingBackend) cacheLogsByBlockNumber(
blockNumber uint64,
receipts []*model.Receipt,
) {
var logs []*model.Log
for i := range receipts {
logs = append(logs, receipts[i].Logs...)
}
cb.logsByBlockNumber.Store(blockNumber, logs)
}

func (cb *cachingBackend) blockNumberFromRound(
ctx context.Context,
round uint64,
Expand Down Expand Up @@ -369,11 +403,22 @@ func (cb *cachingBackend) cachedReceiptByTxHash(
return nil, false
}

func (cb *cachingBackend) cachedLogsByNumber(
blockNumber uint64,
) ([]*model.Log, bool) {
untypedLogs, ok := cb.logsByBlockNumber.Load(blockNumber)
if ok {
return untypedLogs.([]*model.Log), true
}
return nil, false
}

func (cb *cachingBackend) onRejectOrEvictBlock(
item *ristretto.Item,
) {
blk := item.Value.(*model.Block)
cb.blockByHashHex.Delete(blk.Hash)
cb.logsByBlockNumber.Delete(blk.Round)
}

func newCachingBackend(
Expand All @@ -382,8 +427,8 @@ func newCachingBackend(
cfg *conf.CacheConfig,
) (Backend, error) {
const (
defaultBlockCacheSize = 1024 // In blocks
defaultTxCacheSize = 1024 * 1024 * 1024 // In bytes
defaultBlockCacheSize = 128 // In blocks
defaultTxCacheSize = 50 * 1024 * 1024 // In bytes
defaultReceiptCacheSize = defaultTxCacheSize // In bytes

bufferItems = 64 // Per documentation
Expand Down

0 comments on commit f7fc08e

Please sign in to comment.