Skip to content

Commit

Permalink
[lbry] refactored parallel hash computation location, other hash code
Browse files Browse the repository at this point in the history
formatting
  • Loading branch information
BrannonKing committed Aug 14, 2021
1 parent f418b2b commit 8a5340a
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 161 deletions.
104 changes: 86 additions & 18 deletions claimtrie/claimtrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package claimtrie

import (
"bytes"
"fmt"
"path/filepath"
"runtime"
"sort"
"sync"

"github.com/pkg/errors"

Expand Down Expand Up @@ -80,12 +83,13 @@ func New(cfg config.Config) (*ClaimTrie, error) {
if err != nil {
return nil, errors.Wrap(err, "creating node base manager")
}
nodeManager := node.NewNormalizingManager(baseManager)
normalizingManager := node.NewNormalizingManager(baseManager)
nodeManager := &node.HashV2Manager{Manager: normalizingManager}
cleanups = append(cleanups, nodeManager.Close)

var trie merkletrie.MerkleTrie
if cfg.RamTrie {
trie = merkletrie.NewRamTrie(nodeManager)
trie = merkletrie.NewRamTrie()
} else {

// Initialize repository for MerkleTrie. The cleanup is delegated to MerkleTrie.
Expand All @@ -95,7 +99,7 @@ func New(cfg config.Config) (*ClaimTrie, error) {
return nil, errors.Wrap(err, "creating trie repo")
}

persistentTrie := merkletrie.NewPersistentTrie(nodeManager, trieRepo)
persistentTrie := merkletrie.NewPersistentTrie(trieRepo)
cleanups = append(cleanups, persistentTrie.Close)
trie = persistentTrie
}
Expand Down Expand Up @@ -129,8 +133,11 @@ func New(cfg config.Config) (*ClaimTrie, error) {
ct.Close()
return nil, errors.Wrap(err, "increment height to")
}
// TODO: pass in the interrupt signal here:
trie.SetRoot(hash, nil) // keep this after IncrementHeightTo
err = trie.SetRoot(hash) // keep this after IncrementHeightTo
if err == merkletrie.ErrFullRebuildRequired {
// TODO: pass in the interrupt signal here:
ct.runFullTrieRebuild(nil)
}

if !ct.MerkleHash().IsEqual(hash) {
ct.Close()
Expand Down Expand Up @@ -235,7 +242,7 @@ func (ct *ClaimTrie) AppendBlock() error {
names = append(names, expirations...)
names = removeDuplicates(names)

nhns := ct.nodeManager.MakeNameHashNext(names, false)
nhns := ct.makeNameHashNext(names, false)
for nhn := range nhns {

ct.merkleTrie.Update(nhn.Name, nhn.Hash, true)
Expand All @@ -260,26 +267,19 @@ func (ct *ClaimTrie) AppendBlock() error {
ct.blockRepo.Set(ct.height, h)

if hitFork {
ct.merkleTrie.SetRoot(h, names) // for clearing the memory entirely
err = ct.merkleTrie.SetRoot(h) // for clearing the memory entirely
}

return nil
return errors.Wrap(err, "merkle trie clear memory")
}

func (ct *ClaimTrie) updateTrieForHashForkIfNecessary() bool {
if ct.height != param.ActiveParams.AllClaimsInMerkleForkHeight {
return false
}

node.LogOnce("Marking all trie nodes as dirty for the hash fork...")

// invalidate all names because we have to recompute the hash on everything
pairs := ct.nodeManager.MakeNameHashNext(nil, true)
for pair := range pairs {
ct.merkleTrie.Update(pair.Name, pair.Hash, false)
}

node.LogOnce("Done. Now recomputing all hashes...")
node.LogOnce(fmt.Sprintf("Rebuilding all trie nodes for the hash fork at %d...", ct.height))
ct.runFullTrieRebuild(nil)
return true
}

Expand Down Expand Up @@ -322,14 +322,32 @@ func (ct *ClaimTrie) ResetHeight(height int32) error {
if passedHashFork {
names = nil // force them to reconsider all names
}
ct.merkleTrie.SetRoot(hash, names)
err = ct.merkleTrie.SetRoot(hash)
if err == merkletrie.ErrFullRebuildRequired {
ct.runFullTrieRebuild(names)
}

if !ct.MerkleHash().IsEqual(hash) {
return errors.Errorf("unable to restore the hash at height %d", height)
}
return nil
}

func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte) {
var nhns chan NameHashNext
if names == nil {
node.LogOnce("Building the entire claim trie in RAM...")

nhns = ct.makeNameHashNext(nil, true)
} else {
nhns = ct.makeNameHashNext(names, false)
}

for nhn := range nhns {
ct.merkleTrie.Update(nhn.Name, nhn.Hash, false)
}
}

// MerkleHash returns the Merkle Hash of the claimTrie.
func (ct *ClaimTrie) MerkleHash() *chainhash.Hash {
if ct.height >= param.ActiveParams.AllClaimsInMerkleForkHeight {
Expand Down Expand Up @@ -392,3 +410,53 @@ func (ct *ClaimTrie) FlushToDisk() {
node.Warn("During blockRepo flush: " + err.Error())
}
}

type NameHashNext struct {
Name []byte
Hash *chainhash.Hash
Next int32
}

func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool) chan NameHashNext {
inputs := make(chan []byte, 512)
outputs := make(chan NameHashNext, 512)

var wg sync.WaitGroup
computeHash := func() {
for name := range inputs {
hash, next := ct.nodeManager.Hash(name)
outputs <- NameHashNext{name, hash, next}
}
wg.Done()
}

threads := int(0.8 * float32(runtime.NumCPU()))
if threads < 1 {
threads = 1
}
for threads >= 0 {
threads--
wg.Add(1)
go computeHash()
}
go func() {
if all {
ct.nodeManager.IterateNames(func(name []byte) bool {
clone := make([]byte, len(name))
copy(clone, name) // iteration name buffer is reused on future loops
inputs <- clone
return true
})
} else {
for _, name := range names {
inputs <- name
}
}
close(inputs)
}()
go func() {
wg.Wait()
close(outputs)
}()
return outputs
}
4 changes: 2 additions & 2 deletions claimtrie/claimtrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ func TestRebuild(t *testing.T) {
r.NotNil(m)
r.NotEqual(*merkletrie.EmptyTrieHash, *m)

ct.merkleTrie = merkletrie.NewRamTrie(ct.nodeManager)
ct.merkleTrie.SetRoot(m, nil)
ct.merkleTrie = merkletrie.NewRamTrie()
ct.runFullTrieRebuild(nil)

m2 := ct.MerkleHash()
r.NotNil(m2)
Expand Down
4 changes: 2 additions & 2 deletions claimtrie/cmd/cmd/merkletrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func NewTrieNameCommand() *cobra.Command {
return errors.Wrapf(err, "open merkle trie repo")
}

trie := merkletrie.NewPersistentTrie(nil, trieRepo)
trie := merkletrie.NewPersistentTrie(trieRepo)
defer trie.Close()

trie.SetRoot(&hash, nil)
trie.SetRoot(&hash)

if len(name) > 1 {
trie.Dump(name)
Expand Down
16 changes: 5 additions & 11 deletions claimtrie/merkletrie/merkletrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,19 @@ var (
NoClaimsHash = &chainhash.Hash{3}
)

// ValueStore enables PersistentTrie to query node values from different implementations.
type ValueStore interface {
MakeNameHashNext(names [][]byte, all bool) chan node.NameHashNext
}

// PersistentTrie implements a 256-way prefix tree.
type PersistentTrie struct {
store ValueStore
repo Repo
repo Repo

root *vertex
bufs *sync.Pool
}

// NewPersistentTrie returns a PersistentTrie.
func NewPersistentTrie(store ValueStore, repo Repo) *PersistentTrie {
func NewPersistentTrie(repo Repo) *PersistentTrie {

tr := &PersistentTrie{
store: store,
repo: repo,
repo: repo,
bufs: &sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
Expand All @@ -52,9 +45,10 @@ func NewPersistentTrie(store ValueStore, repo Repo) *PersistentTrie {
}

// SetRoot drops all resolved nodes in the PersistentTrie, and set the Root with specified hash.
func (t *PersistentTrie) SetRoot(h *chainhash.Hash, names [][]byte) {
func (t *PersistentTrie) SetRoot(h *chainhash.Hash) error {
t.root = newVertex(h)
runtime.GC()
return nil
}

// Update updates the nodes along the path to the key.
Expand Down
37 changes: 14 additions & 23 deletions claimtrie/merkletrie/ramtrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package merkletrie

import (
"bytes"
"errors"
"runtime"
"sync"

Expand All @@ -10,7 +11,7 @@ import (
)

type MerkleTrie interface {
SetRoot(h *chainhash.Hash, names [][]byte)
SetRoot(h *chainhash.Hash) error
Update(name []byte, h *chainhash.Hash, restoreChildren bool)
MerkleHash() *chainhash.Hash
MerkleHashAllClaims() *chainhash.Hash
Expand All @@ -19,13 +20,11 @@ type MerkleTrie interface {

type RamTrie struct {
collapsedTrie
store ValueStore
bufs *sync.Pool
bufs *sync.Pool
}

func NewRamTrie(s ValueStore) *RamTrie {
func NewRamTrie() *RamTrie {
return &RamTrie{
store: s,
bufs: &sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
Expand All @@ -35,30 +34,22 @@ func NewRamTrie(s ValueStore) *RamTrie {
}
}

func (rt *RamTrie) SetRoot(h *chainhash.Hash, names [][]byte) {
var ErrFullRebuildRequired = errors.New("a full rebuild is required")

func (rt *RamTrie) SetRoot(h *chainhash.Hash) error {
if rt.Root.merkleHash.IsEqual(h) {
runtime.GC()
return
return nil
}

var nhns chan node.NameHashNext
if names == nil {
node.LogOnce("Building the entire claim trie in RAM...") // could put this in claimtrie.go

// should technically clear the old trie first:
if rt.Nodes > 1 {
rt.Root = &collapsedVertex{key: make(KeyType, 0)}
rt.Nodes = 1
runtime.GC()
}
nhns = rt.store.MakeNameHashNext(nil, true)
} else {
nhns = rt.store.MakeNameHashNext(names, false)
// should technically clear the old trie first:
if rt.Nodes > 1 {
rt.Root = &collapsedVertex{key: make(KeyType, 0)}
rt.Nodes = 1
runtime.GC()
}

for nhn := range nhns {
rt.Update(nhn.Name, nhn.Hash, false)
}
return ErrFullRebuildRequired
}

func (rt *RamTrie) Update(name []byte, h *chainhash.Hash, _ bool) {
Expand Down
39 changes: 39 additions & 0 deletions claimtrie/node/hashfork_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package node

import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/claimtrie/param"
)

type HashV2Manager struct {
Manager
}

func (nm *HashV2Manager) computeClaimHashes(name []byte) (*chainhash.Hash, int32) {

n, err := nm.NodeAt(nm.Height(), name)
if err != nil || n == nil {
return nil, 0
}

n.SortClaimsByBid()
claimHashes := make([]*chainhash.Hash, 0, len(n.Claims))
for _, c := range n.Claims {
if c.Status == Activated { // TODO: unit test this line
claimHashes = append(claimHashes, calculateNodeHash(c.OutPoint, n.TakenOverAt))
}
}
if len(claimHashes) > 0 {
return ComputeMerkleRoot(claimHashes), n.NextUpdate()
}
return nil, n.NextUpdate()
}

func (nm *HashV2Manager) Hash(name []byte) (*chainhash.Hash, int32) {

if nm.Height() >= param.ActiveParams.AllClaimsInMerkleForkHeight {
return nm.computeClaimHashes(name)
}

return nm.Manager.Hash(name)
}
30 changes: 29 additions & 1 deletion claimtrie/node/hashfunc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package node

import "github.com/btcsuite/btcd/chaincfg/chainhash"
import (
"crypto/sha256"
"encoding/binary"
"strconv"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
)

func HashMerkleBranches(left *chainhash.Hash, right *chainhash.Hash) *chainhash.Hash {
// Concatenate the left and right nodes.
Expand All @@ -27,3 +34,24 @@ func ComputeMerkleRoot(hashes []*chainhash.Hash) *chainhash.Hash {
}
return hashes[0]
}

func calculateNodeHash(op wire.OutPoint, takeover int32) *chainhash.Hash {

txHash := chainhash.DoubleHashH(op.Hash[:])

nOut := []byte(strconv.Itoa(int(op.Index)))
nOutHash := chainhash.DoubleHashH(nOut)

buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(takeover))
heightHash := chainhash.DoubleHashH(buf)

h := make([]byte, 0, sha256.Size*3)
h = append(h, txHash[:]...)
h = append(h, nOutHash[:]...)
h = append(h, heightHash[:]...)

hh := chainhash.DoubleHashH(h)

return &hh
}
Loading

0 comments on commit 8a5340a

Please sign in to comment.