From fe27bbd28c944f8f187dc41aec65dd45e8e968fb Mon Sep 17 00:00:00 2001 From: Brannon King Date: Sat, 14 May 2022 12:22:06 -0400 Subject: [PATCH] [lbry] claimtrie: change node handling to be single-threaded --- claimtrie/claimtrie.go | 92 +++++++++------------------------------ claimtrie/node/cache.go | 16 ------- claimtrie/node/manager.go | 8 ++-- 3 files changed, 26 insertions(+), 90 deletions(-) diff --git a/claimtrie/claimtrie.go b/claimtrie/claimtrie.go index 68ef648893..11c406cb45 100644 --- a/claimtrie/claimtrie.go +++ b/claimtrie/claimtrie.go @@ -4,9 +4,7 @@ import ( "bytes" "fmt" "path/filepath" - "runtime" "sort" - "sync" "github.com/pkg/errors" @@ -246,17 +244,17 @@ func (ct *ClaimTrie) AppendBlock(temporary bool) error { names = append(names, expirations...) names = removeDuplicates(names) - nhns := ct.makeNameHashNext(names, false, nil) - for nhn := range nhns { + for _, name := range names { - ct.merkleTrie.Update(nhn.Name, nhn.Hash, true) - if nhn.Next <= 0 { + hash, next := ct.nodeManager.Hash(name) + ct.merkleTrie.Update(name, hash, true) + if next <= 0 { continue } - newName := normalization.NormalizeIfNecessary(nhn.Name, nhn.Next) + newName := normalization.NormalizeIfNecessary(name, next) updateNames = append(updateNames, newName) - updateHeights = append(updateHeights, nhn.Next) + updateHeights = append(updateHeights, next) } if !temporary && len(updateNames) > 0 { err = ct.temporalRepo.SetNodesAt(updateNames, updateHeights) @@ -343,17 +341,25 @@ func (ct *ClaimTrie) ResetHeight(height int32) error { } func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte, interrupt <-chan struct{}) { - var nhns chan NameHashNext if names == nil { node.LogOnce("Building the entire claim trie in RAM...") - nhns = ct.makeNameHashNext(nil, true, interrupt) - } else { - nhns = ct.makeNameHashNext(names, false, interrupt) - } + ct.nodeManager.IterateNames(func(name []byte) bool { + if interruptRequested(interrupt) { + return false + } + clone := make([]byte, len(name)) + copy(clone, name) + hash, _ := ct.nodeManager.Hash(clone) + ct.merkleTrie.Update(clone, hash, false) + return true + }) - for nhn := range nhns { - ct.merkleTrie.Update(nhn.Name, nhn.Hash, false) + } else { + for _, name := range names { + hash, _ := ct.nodeManager.Hash(name) + ct.merkleTrie.Update(name, hash, false) + } } } @@ -420,12 +426,6 @@ func (ct *ClaimTrie) FlushToDisk() { } } -type NameHashNext struct { - Name []byte - Hash *chainhash.Hash - Next int32 -} - func interruptRequested(interrupted <-chan struct{}) bool { select { case <-interrupted: // should never block on nil @@ -435,53 +435,3 @@ func interruptRequested(interrupted <-chan struct{}) bool { return false } - -func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool, interrupt <-chan struct{}) chan NameHashNext { - inputs := make(chan []byte, 512) - outputs := make(chan NameHashNext, 512) - - var wg sync.WaitGroup - hashComputationWorker := func() { - for name := range inputs { - hash, next := ct.nodeManager.Hash(name) - outputs <- NameHashNext{name, hash, next} - } - wg.Done() - } - - threads := int(0.8 * float32(runtime.NumCPU())) - if threads < 1 { - threads = 1 - } - for threads > 0 { - threads-- - wg.Add(1) - go hashComputationWorker() - } - go func() { - if all { - ct.nodeManager.IterateNames(func(name []byte) bool { - if interruptRequested(interrupt) { - return false - } - clone := make([]byte, len(name)) - copy(clone, name) // iteration name buffer is reused on future loops - inputs <- clone - return true - }) - } else { - for _, name := range names { - if interruptRequested(interrupt) { - break - } - inputs <- name - } - } - close(inputs) - }() - go func() { - wg.Wait() - close(outputs) - }() - return outputs -} diff --git a/claimtrie/node/cache.go b/claimtrie/node/cache.go index 0f556af1a2..905ecd1d36 100644 --- a/claimtrie/node/cache.go +++ b/claimtrie/node/cache.go @@ -2,7 +2,6 @@ package node import ( "container/list" - "sync" "github.com/lbryio/lbcd/claimtrie/change" ) @@ -17,16 +16,12 @@ type cacheLeaf struct { type Cache struct { nodes map[string]*cacheLeaf order *list.List - mtx sync.Mutex limit int } func (nc *Cache) insert(name []byte, n *Node, height int32) { key := string(name) - nc.mtx.Lock() - defer nc.mtx.Unlock() - existing := nc.nodes[key] if existing != nil { existing.node = n @@ -49,9 +44,6 @@ func (nc *Cache) insert(name []byte, n *Node, height int32) { func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32) { key := string(name) - nc.mtx.Lock() - defer nc.mtx.Unlock() - existing := nc.nodes[key] if existing != nil && existing.height <= height { nc.order.MoveToFront(existing.element) @@ -61,9 +53,6 @@ func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32 } func (nc *Cache) addChanges(changes []change.Change, height int32) { - nc.mtx.Lock() - defer nc.mtx.Unlock() - for _, c := range changes { key := string(c.Name) existing := nc.nodes[key] @@ -74,9 +63,6 @@ func (nc *Cache) addChanges(changes []change.Change, height int32) { } func (nc *Cache) drop(names [][]byte) { - nc.mtx.Lock() - defer nc.mtx.Unlock() - for _, name := range names { key := string(name) existing := nc.nodes[key] @@ -89,8 +75,6 @@ func (nc *Cache) drop(names [][]byte) { } func (nc *Cache) clear() { - nc.mtx.Lock() - defer nc.mtx.Unlock() nc.nodes = map[string]*cacheLeaf{} nc.order = list.New() // we'll let the GC sort out the remains... diff --git a/claimtrie/node/manager.go b/claimtrie/node/manager.go index 31ba0f1a20..7081ac250f 100644 --- a/claimtrie/node/manager.go +++ b/claimtrie/node/manager.go @@ -67,14 +67,16 @@ func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) { return nil, errors.Wrap(err, "in new node") } // TODO: how can we tell what needs to be cached? - if nm.tempChanges == nil && height == nm.height && n != nil && (len(changes) > 7 || len(name) < 12) { + if nm.tempChanges == nil && height == nm.height && n != nil && (len(changes) > 4 || len(name) < 12) { nm.cache.insert(name, n, height) } } else { if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block changes = append(changes, nm.tempChanges[string(name)]...) + n = n.Clone() + } else if height != nm.height { + n = n.Clone() } - n = n.Clone() updated, err := nm.updateFromChanges(n, changes, height) if err != nil { return nil, errors.Wrap(err, "in update from changes") @@ -82,7 +84,7 @@ func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) { if !updated { n.AdjustTo(oldHeight, height, name) } - if nm.tempChanges == nil && height == nm.height { // TODO: how many changes before we update the cache? + if nm.tempChanges == nil && height == nm.height { nm.cache.insert(name, n, height) } }