Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pex: use highwayhash for pex bucket #4810

Merged
merged 14 commits into from May 8, 2020
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -32,6 +32,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi

### FEATURES:

- [pex] [\#4439](https://github.com/tendermint/tendermint/pull/4439) Use highwayhash for pex buckets (@tau3)
- [statesync] Add state sync support, where a new node can be rapidly bootstrapped by fetching state snapshots from peers instead of replaying blocks. See the `[statesync]` config section.
- [evidence] [\#4532](https://github.com/tendermint/tendermint/pull/4532) Handle evidence from light clients (@melekes)
- [lite2] [\#4532](https://github.com/tendermint/tendermint/pull/4532) Submit conflicting headers, if any, to a full node & all witnesses (@melekes)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -16,6 +16,7 @@ require (
github.com/gtank/merlin v0.1.1
github.com/libp2p/go-buffer-pool v0.0.2
github.com/magiconair/properties v1.8.1
github.com/minio/highwayhash v1.0.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.6.0
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Expand Up @@ -262,6 +262,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643 h1:hLDRPB66XQT/8+wG9WsDpiCvZf1yKO7sz7scAjSlBa0=
github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643/go.mod h1:43+3pMjjKimDBf5Kr4ZFNGbLql1zKkbImw+fZbw3geM=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
Expand Down Expand Up @@ -506,6 +508,7 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
106 changes: 74 additions & 32 deletions p2p/pex/addrbook.go
Expand Up @@ -5,7 +5,7 @@
package pex

import (
"crypto/sha256"
crand "crypto/rand"
"encoding/binary"
"fmt"
"math"
Expand All @@ -14,6 +14,7 @@ import (
"sync"
"time"

"github.com/minio/highwayhash"
"github.com/tendermint/tendermint/crypto"
tmmath "github.com/tendermint/tendermint/libs/math"
tmrand "github.com/tendermint/tendermint/libs/rand"
Expand Down Expand Up @@ -100,10 +101,17 @@ type addrBook struct {
filePath string
key string // random prefix for bucket placement
routabilityStrict bool
hashKey []byte

wg sync.WaitGroup
}

func newHashKey() []byte {
result := make([]byte, highwayhash.Size)
crand.Read(result)
return result
}

// NewAddrBook creates a new address book.
// Use Start to begin processing asynchronous address updates.
func NewAddrBook(filePath string, routabilityStrict bool) AddrBook {
Expand All @@ -115,6 +123,7 @@ func NewAddrBook(filePath string, routabilityStrict bool) AddrBook {
badPeers: make(map[p2p.ID]*knownAddress),
filePath: filePath,
routabilityStrict: routabilityStrict,
hashKey: newHashKey(),
}
am.init()
am.BaseService = *service.NewBaseService(nil, "AddrBook", am)
Expand Down Expand Up @@ -344,16 +353,28 @@ func (a *addrBook) MarkBad(addr *p2p.NetAddress, banTime time.Duration) {
}
}

// ReinstateBadPeers removes bad peers from ban list and places them into a new
// bucket.
func (a *addrBook) ReinstateBadPeers() {
a.mtx.Lock()
defer a.mtx.Unlock()

for _, ka := range a.badPeers {
if !ka.isBanned() {
bucket := a.calcNewBucket(ka.Addr, ka.Src)
a.addToNewBucket(ka, bucket)
delete(a.badPeers, ka.ID())
a.Logger.Info("Reinstated address", "addr", ka.Addr)
if ka.isBanned() {
continue
}

bucket, err := a.calcNewBucket(ka.Addr, ka.Src)
if err != nil {
a.Logger.Error("Failed to calculate new bucket (bad peer won't be reinstantiated)",
"addr", ka.Addr, "err", err)
continue
}

a.addToNewBucket(ka, bucket)
delete(a.badPeers, ka.ID())

a.Logger.Info("Reinstated address", "addr", ka.Addr)
}
}

Expand Down Expand Up @@ -659,7 +680,10 @@ func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error {
ka = newKnownAddress(addr, src)
}

bucket := a.calcNewBucket(addr, src)
bucket, err := a.calcNewBucket(addr, src)
if err != nil {
return err
}
a.addToNewBucket(ka, bucket)
return nil
}
Expand Down Expand Up @@ -722,15 +746,15 @@ func (a *addrBook) expireNew(bucketIdx int) {
// Promotes an address from new to old. If the destination bucket is full,
// demote the oldest one to a "new" bucket.
// TODO: Demote more probabilistically?
func (a *addrBook) moveToOld(ka *knownAddress) {
func (a *addrBook) moveToOld(ka *knownAddress) error {
// Sanity check
if ka.isOld() {
a.Logger.Error(fmt.Sprintf("Cannot promote address that is already old %v", ka))
return
return nil
}
if len(ka.Buckets) == 0 {
a.Logger.Error(fmt.Sprintf("Cannot promote address that isn't in any new buckets %v", ka))
return
return nil
}

// Remove from all (new) buckets.
Expand All @@ -739,13 +763,19 @@ func (a *addrBook) moveToOld(ka *knownAddress) {
ka.BucketType = bucketTypeOld

// Try to add it to its oldBucket destination.
oldBucketIdx := a.calcOldBucket(ka.Addr)
oldBucketIdx, err := a.calcOldBucket(ka.Addr)
if err != nil {
return err
}
added := a.addToOldBucket(ka, oldBucketIdx)
if !added {
// No room; move the oldest to a new bucket
oldest := a.pickOldest(bucketTypeOld, oldBucketIdx)
a.removeFromBucket(oldest, bucketTypeOld, oldBucketIdx)
newBucketIdx := a.calcNewBucket(oldest.Addr, oldest.Src)
newBucketIdx, err := a.calcNewBucket(oldest.Addr, oldest.Src)
if err != nil {
return err
}
a.addToNewBucket(oldest, newBucketIdx)

// Finally, add our ka to old bucket again.
Expand All @@ -754,6 +784,7 @@ func (a *addrBook) moveToOld(ka *knownAddress) {
a.Logger.Error(fmt.Sprintf("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx))
}
}
return nil
}

func (a *addrBook) removeAddress(addr *p2p.NetAddress) {
Expand Down Expand Up @@ -785,14 +816,16 @@ func (a *addrBook) addBadPeer(addr *p2p.NetAddress, banTime time.Duration) bool
//---------------------------------------------------------------------
// calculate bucket placements

// doublesha256( key + sourcegroup +
// int64(doublesha256(key + group + sourcegroup))%bucket_per_group ) % num_new_buckets
func (a *addrBook) calcNewBucket(addr, src *p2p.NetAddress) int {
// hash(key + sourcegroup + int64(hash(key + group + sourcegroup)) % bucket_per_group) % num_new_buckets
func (a *addrBook) calcNewBucket(addr, src *p2p.NetAddress) (int, error) {
data1 := []byte{}
data1 = append(data1, []byte(a.key)...)
data1 = append(data1, []byte(a.groupKey(addr))...)
data1 = append(data1, []byte(a.groupKey(src))...)
hash1 := doubleSha256(data1)
hash1, err := a.hash(data1)
if err != nil {
return 0, err
}
hash64 := binary.BigEndian.Uint64(hash1)
hash64 %= newBucketsPerGroup
var hashbuf [8]byte
Expand All @@ -802,17 +835,23 @@ func (a *addrBook) calcNewBucket(addr, src *p2p.NetAddress) int {
data2 = append(data2, a.groupKey(src)...)
data2 = append(data2, hashbuf[:]...)

hash2 := doubleSha256(data2)
return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
hash2, err := a.hash(data2)
if err != nil {
return 0, err
}
result := int(binary.BigEndian.Uint64(hash2) % newBucketCount)
return result, nil
}

// doublesha256( key + group +
// int64(doublesha256(key + addr))%buckets_per_group ) % num_old_buckets
func (a *addrBook) calcOldBucket(addr *p2p.NetAddress) int {
// hash(key + group + int64(hash(key + addr)) % buckets_per_group) % num_old_buckets
func (a *addrBook) calcOldBucket(addr *p2p.NetAddress) (int, error) {
data1 := []byte{}
data1 = append(data1, []byte(a.key)...)
data1 = append(data1, []byte(addr.String())...)
hash1 := doubleSha256(data1)
hash1, err := a.hash(data1)
if err != nil {
return 0, err
}
hash64 := binary.BigEndian.Uint64(hash1)
hash64 %= oldBucketsPerGroup
var hashbuf [8]byte
Expand All @@ -822,8 +861,12 @@ func (a *addrBook) calcOldBucket(addr *p2p.NetAddress) int {
data2 = append(data2, a.groupKey(addr)...)
data2 = append(data2, hashbuf[:]...)

hash2 := doubleSha256(data2)
return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
hash2, err := a.hash(data2)
if err != nil {
return 0, err
}
result := int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
return result, nil
}

// Return a string representing the network group of this address.
Expand Down Expand Up @@ -875,12 +918,11 @@ func (a *addrBook) groupKey(na *p2p.NetAddress) string {
return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
}

// doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes.
func doubleSha256(b []byte) []byte {
hasher := sha256.New()
hasher.Write(b) // nolint:errcheck
sum := hasher.Sum(nil)
hasher.Reset()
hasher.Write(sum) // nolint:errcheck
return hasher.Sum(nil)
func (a *addrBook) hash(b []byte) ([]byte, error) {
hasher, err := highwayhash.New64(a.hashKey)
if err != nil {
return nil, err
}
hasher.Write(b)
return hasher.Sum(nil), nil
}