Skip to content

Commit

Permalink
[lbry] claimtrie: created node cache
Browse files Browse the repository at this point in the history
  • Loading branch information
roylee17 committed Sep 29, 2022
1 parent cbc4d48 commit 979d643
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 98 deletions.
97 changes: 23 additions & 74 deletions claimtrie/claimtrie.go
Expand Up @@ -4,9 +4,7 @@ import (
"bytes"
"fmt"
"path/filepath"
"runtime"
"sort"
"sync"

"github.com/pkg/errors"

Expand Down Expand Up @@ -249,17 +247,17 @@ func (ct *ClaimTrie) AppendBlock(temporary bool) error {
names = append(names, expirations...)
names = removeDuplicates(names)

nhns := ct.makeNameHashNext(names, false, nil)
for nhn := range nhns {
for _, name := range names {

ct.merkleTrie.Update(nhn.Name, nhn.Hash, true)
if nhn.Next <= 0 {
hash, next := ct.nodeManager.Hash(name)
ct.merkleTrie.Update(name, hash, true)
if next <= 0 {
continue
}

newName := normalization.NormalizeIfNecessary(nhn.Name, nhn.Next)
newName := normalization.NormalizeIfNecessary(name, next)
updateNames = append(updateNames, newName)
updateHeights = append(updateHeights, nhn.Next)
updateHeights = append(updateHeights, next)
}
if !temporary && len(updateNames) > 0 {
err = ct.temporalRepo.SetNodesAt(updateNames, updateHeights)
Expand Down Expand Up @@ -356,22 +354,29 @@ func (ct *ClaimTrie) ResetHeight(height int32) error {
}

func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte, interrupt <-chan struct{}) {
var nhns chan NameHashNext
if names == nil {
node.Log("Building the entire claim trie in RAM...")
ct.claimLogger = newClaimProgressLogger("Processed", node.GetLogger())
nhns = ct.makeNameHashNext(nil, true, interrupt)
} else {
ct.claimLogger = nil
nhns = ct.makeNameHashNext(names, false, interrupt)
}

for nhn := range nhns {
ct.merkleTrie.Update(nhn.Name, nhn.Hash, false)
if ct.claimLogger != nil {
ct.claimLogger.LogName(nhn.Name)
ct.nodeManager.IterateNames(func(name []byte) bool {
if interruptRequested(interrupt) {
return false
}
clone := make([]byte, len(name))
copy(clone, name)
hash, _ := ct.nodeManager.Hash(clone)
ct.merkleTrie.Update(clone, hash, false)
ct.claimLogger.LogName(name)
return true
})

} else {
for _, name := range names {
hash, _ := ct.nodeManager.Hash(name)
ct.merkleTrie.Update(name, hash, false)
}
}

}

// MerkleHash returns the Merkle Hash of the claimTrie.
Expand Down Expand Up @@ -437,12 +442,6 @@ func (ct *ClaimTrie) FlushToDisk() {
}
}

type NameHashNext struct {
Name []byte
Hash *chainhash.Hash
Next int32
}

func interruptRequested(interrupted <-chan struct{}) bool {
select {
case <-interrupted: // should never block on nil
Expand All @@ -452,53 +451,3 @@ func interruptRequested(interrupted <-chan struct{}) bool {

return false
}

func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool, interrupt <-chan struct{}) chan NameHashNext {
inputs := make(chan []byte, 512)
outputs := make(chan NameHashNext, 512)

var wg sync.WaitGroup
hashComputationWorker := func() {
for name := range inputs {
hash, next := ct.nodeManager.Hash(name)
outputs <- NameHashNext{name, hash, next}
}
wg.Done()
}

threads := int(0.8 * float32(runtime.GOMAXPROCS(0)))
if threads < 1 {
threads = 1
}
for threads > 0 {
threads--
wg.Add(1)
go hashComputationWorker()
}
go func() {
if all {
ct.nodeManager.IterateNames(func(name []byte) bool {
if interruptRequested(interrupt) {
return false
}
clone := make([]byte, len(name))
copy(clone, name) // iteration name buffer is reused on future loops
inputs <- clone
return true
})
} else {
for _, name := range names {
if interruptRequested(interrupt) {
break
}
inputs <- name
}
}
close(inputs)
}()
go func() {
wg.Wait()
close(outputs)
}()
return outputs
}
85 changes: 85 additions & 0 deletions claimtrie/node/cache.go
@@ -0,0 +1,85 @@
package node

import (
"container/list"

"github.com/lbryio/lbcd/claimtrie/change"
)

type cacheLeaf struct {
node *Node
element *list.Element
changes []change.Change
height int32
}

type Cache struct {
nodes map[string]*cacheLeaf
order *list.List
limit int
}

func (nc *Cache) insert(name []byte, n *Node, height int32) {
key := string(name)

existing := nc.nodes[key]
if existing != nil {
existing.node = n
existing.height = height
existing.changes = nil
nc.order.MoveToFront(existing.element)
return
}

for nc.order.Len() >= nc.limit {
// TODO: maybe ensure that we don't remove nodes that have a lot of changes?
delete(nc.nodes, nc.order.Back().Value.(string))
nc.order.Remove(nc.order.Back())
}

element := nc.order.PushFront(key)
nc.nodes[key] = &cacheLeaf{node: n, element: element, height: height}
}

func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32) {
key := string(name)

existing := nc.nodes[key]
if existing != nil && existing.height <= height {
nc.order.MoveToFront(existing.element)
return existing.node, existing.changes, existing.height
}
return nil, nil, -1
}

func (nc *Cache) addChanges(changes []change.Change, height int32) {
for _, c := range changes {
key := string(c.Name)
existing := nc.nodes[key]
if existing != nil && existing.height <= height {
existing.changes = append(existing.changes, c)
}
}
}

func (nc *Cache) drop(names [][]byte) {
for _, name := range names {
key := string(name)
existing := nc.nodes[key]
if existing != nil {
// we can't roll it backwards because we don't know its previous height value; just toast it
delete(nc.nodes, key)
nc.order.Remove(existing.element)
}
}
}

func (nc *Cache) clear() {
nc.nodes = map[string]*cacheLeaf{}
nc.order = list.New()
// we'll let the GC sort out the remains...
}

func NewCache(limit int) *Cache {
return &Cache{limit: limit, nodes: map[string]*cacheLeaf{}, order: list.New()}
}
97 changes: 75 additions & 22 deletions claimtrie/node/manager.go
Expand Up @@ -21,6 +21,7 @@ type Manager interface {
IterateNames(predicate func(name []byte) bool)
Hash(name []byte) (*chainhash.Hash, int32)
Flush() error
ClearCache()
}

type BaseManager struct {
Expand All @@ -30,31 +31,62 @@ type BaseManager struct {
changes []change.Change

tempChanges map[string][]change.Change

cache *Cache
}

func NewBaseManager(repo Repo) (*BaseManager, error) {

nm := &BaseManager{
repo: repo,
repo: repo,
cache: NewCache(10000), // TODO: how many should we cache?
}

return nm, nil
}

func (nm *BaseManager) ClearCache() {
nm.cache.clear()
}

func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) {

changes, err := nm.repo.LoadChanges(name)
if err != nil {
return nil, errors.Wrap(err, "in load changes")
}
n, changes, oldHeight := nm.cache.fetch(name, height)
if n == nil {
changes, err := nm.repo.LoadChanges(name)
if err != nil {
return nil, errors.Wrap(err, "in load changes")
}

if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
changes = append(changes, nm.tempChanges[string(name)]...)
}
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
changes = append(changes, nm.tempChanges[string(name)]...)
}

n, err := nm.newNodeFromChanges(changes, height)
if err != nil {
return nil, errors.Wrap(err, "in new node")
n, err = nm.newNodeFromChanges(changes, height)
if err != nil {
return nil, errors.Wrap(err, "in new node")
}
// TODO: how can we tell what needs to be cached?
if nm.tempChanges == nil && height == nm.height && n != nil && (len(changes) > 4 || len(name) < 12) {
nm.cache.insert(name, n, height)
}
} else {
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
changes = append(changes, nm.tempChanges[string(name)]...)
n = n.Clone()
} else if height != nm.height {
n = n.Clone()
}
updated, err := nm.updateFromChanges(n, changes, height)
if err != nil {
return nil, errors.Wrap(err, "in update from changes")
}
if !updated {
n.AdjustTo(oldHeight, height, name)
}
if nm.tempChanges == nil && height == nm.height {
nm.cache.insert(name, n, height)
}
}

return n, nil
Expand All @@ -66,17 +98,13 @@ func (nm *BaseManager) node(name []byte) (*Node, error) {
return nm.NodeAt(nm.height, name)
}

// newNodeFromChanges returns a new Node constructed from the changes.
// The changes must preserve their order received.
func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) {
func (nm *BaseManager) updateFromChanges(n *Node, changes []change.Change, height int32) (bool, error) {

if len(changes) == 0 {
return nil, nil
count := len(changes)
if count == 0 {
return false, nil
}

n := New()
previous := changes[0].Height
count := len(changes)

for i, chg := range changes {
if chg.Height < previous {
Expand All @@ -95,15 +123,37 @@ func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32)
delay := nm.getDelayForName(n, chg)
err := n.ApplyChange(chg, delay)
if err != nil {
return nil, errors.Wrap(err, "in apply change")
return false, errors.Wrap(err, "in apply change")
}
}

if count <= 0 {
return nil, nil
// we applied no changes, which means we shouldn't exist if we had all the changes
// or might mean nothing significant if we are applying a partial changeset
return false, nil
}
lastChange := changes[count-1]
return n.AdjustTo(lastChange.Height, height, lastChange.Name), nil
n.AdjustTo(lastChange.Height, height, lastChange.Name)
return true, nil
}

// newNodeFromChanges returns a new Node constructed from the changes.
// The changes must preserve their order received.
func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) {

if len(changes) == 0 {
return nil, nil
}

n := New()
updated, err := nm.updateFromChanges(n, changes, height)
if err != nil {
return nil, errors.Wrap(err, "in update from changes")
}
if updated {
return n, nil
}
return nil, nil
}

func (nm *BaseManager) AppendChange(chg change.Change) {
Expand Down Expand Up @@ -220,6 +270,7 @@ func (nm *BaseManager) IncrementHeightTo(height int32, temporary bool) ([][]byte
}

if !temporary {
nm.cache.addChanges(nm.changes, height)
if err := nm.repo.AppendChanges(nm.changes); err != nil { // destroys names
return nil, errors.Wrap(err, "in append changes")
}
Expand Down Expand Up @@ -255,6 +306,8 @@ func (nm *BaseManager) DecrementHeightTo(affectedNames [][]byte, height int32) (
return affectedNames, errors.Wrap(err, "in drop changes")
}
}

nm.cache.drop(affectedNames)
}
nm.height = height

Expand Down

0 comments on commit 979d643

Please sign in to comment.