Skip to content

Commit

Permalink
Merge c7ef3e7 into 0b6f6e8
Browse files Browse the repository at this point in the history
  • Loading branch information
thanodnl committed Dec 16, 2016
2 parents 0b6f6e8 + c7ef3e7 commit 3ea45d9
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 30 deletions.
6 changes: 6 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,14 @@ type RingChangedEvent struct {
// RingChecksumEvent is sent when a server is removed or added and a new checksum
// for the ring is calculated
type RingChecksumEvent struct {
// OldChecksum contains the previous legacy checksum. Note: might be deprecated in the future.
OldChecksum uint32
// NewChecksum contains the new legacy checksum. Note: might be deprecated in the future.
NewChecksum uint32
// OldChecksums contains the map of previous checksums
OldChecksums map[string]uint32
// NewChecksums contains the map with new checksums
NewChecksums map[string]uint32
}

// A LookupEvent is sent when a lookup is performed on the Ringpop's ring
Expand Down
56 changes: 56 additions & 0 deletions hashring/checksummer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package hashring

import (
"bytes"
"sort"
"strconv"
"strings"

"github.com/dgryski/go-farm"
)

// Checksummer computes a checksum for an instance of a HashRing. The
// checksum can be used to compare two rings for equality.
type Checksummer interface {
// Checksum calculates the checksum for the hashring that is passed in.
// Compute will be called while having at least a read-lock on the hashring so
// it is safe to read from the ring, but not safe to change the ring. There
// might be multiple Checksum Computes initiated at the same time, but every
// Checksum will only be called once per hashring at once
Checksum(ring *HashRing) (checksum uint32)
}

type identityChecksummer struct{}

func (i *identityChecksummer) Checksum(ring *HashRing) uint32 {
identitySet := make(map[string]struct{})
ring.tree.root.traverseWhile(func(node *redBlackNode) bool {
identitySet[node.key.(replicaPoint).identity] = struct{}{}
return true
})

identities := make([]string, 0, len(identitySet))
for identity := range identitySet {
identities = append(identities, identity)
}

sort.Strings(identities)
bytes := []byte(strings.Join(identities, ";"))
return farm.Fingerprint32(bytes)
}

type replicaPointChecksummer struct{}

func (r *replicaPointChecksummer) Checksum(ring *HashRing) uint32 {
buffer := bytes.Buffer{}

ring.tree.root.traverseWhile(func(node *redBlackNode) bool {
buffer.WriteString(strconv.Itoa(node.key.(replicaPoint).hash))
buffer.WriteString("-")
buffer.WriteString(node.value.(string))
buffer.WriteString(";")
return true
})

return farm.Fingerprint32(buffer.Bytes())
}
96 changes: 96 additions & 0 deletions hashring/checksummer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package hashring

import (
"sort"
"strings"
"testing"

"github.com/dgryski/go-farm"
"github.com/stretchr/testify/assert"
)

// addressChecksum implements the now obsolete checksum method that didn't support identities.
// It's moved to this test file to test backwards compatibility of the identityChecksummer
type addressChecksummer struct{}

func (i *addressChecksummer) Checksum(ring *HashRing) uint32 {
addresses := ring.copyServersNoLock()
sort.Strings(addresses)
bytes := []byte(strings.Join(addresses, ";"))
return farm.Fingerprint32(bytes)
}

func TestAddressChecksum_Compute(t *testing.T) {
members := genMembers(1, 1, 10, false)
ring := New(farm.Fingerprint32, 1)
ring.AddMembers(members...)
checksum := &addressChecksummer{}

addresses := make([]string, 0, 10)
for _, members := range members {
addresses = append(addresses, members.GetAddress())
}

sort.Strings(addresses)
bytes := []byte(strings.Join(addresses, ";"))

expected := farm.Fingerprint32(bytes)
actual := checksum.Checksum(ring)

assert.Equal(t, expected, actual)
}

func TestIdentityChecksum_Compute(t *testing.T) {
identityChecksummer := &identityChecksummer{}

ringWithoutIdentities := New(farm.Fingerprint32, 1)
ringWithoutIdentities.AddMembers(genMembers(1, 1, 10, false)...)

legacyChecksum := (&addressChecksummer{}).Checksum(ringWithoutIdentities)
identityChecksum := identityChecksummer.Checksum(ringWithoutIdentities)

assert.Equal(t, legacyChecksum, identityChecksum, "Identity checksum should be the same as legacy on ring without identities")

ringWithIdentities := New(farm.Fingerprint32, 1)
ringWithIdentities.AddMembers(genMembers(1, 1, 10, true)...)

identityChecksum = identityChecksummer.Checksum(ringWithIdentities)

assert.NotEqual(t, legacyChecksum, identityChecksum, "IdentityChecksummer should not match legacy checksummer on ring with identites ")
}

func TestReplicaPointChecksum_Compute(t *testing.T) {
replicaPointChecksummer := &replicaPointChecksummer{}
members := genMembers(1, 1, 10, false)

ring1ReplicaPoint := New(farm.Fingerprint32, 1)
ring1ReplicaPoint.AddMembers(members...)

ring2ReplicaPoints := New(farm.Fingerprint32, 2)
ring2ReplicaPoints.AddMembers(members...)

checksum1ReplicaPoint := replicaPointChecksummer.Checksum(ring1ReplicaPoint)
checksum2ReplicaPoints := replicaPointChecksummer.Checksum(ring2ReplicaPoints)

assert.NotEqual(t, checksum1ReplicaPoint, checksum2ReplicaPoints, "Checksum should not match with different replica point counts")
}
100 changes: 74 additions & 26 deletions hashring/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ package hashring

import (
"fmt"
"sort"
"strings"
"sync"

"github.com/uber/ringpop-go/events"
"github.com/uber/ringpop-go/logging"
"github.com/uber/ringpop-go/membership"

"github.com/dgryski/go-farm"
"github.com/uber-common/bark"
)

Expand All @@ -51,6 +48,9 @@ type replicaPoint struct {
// hash of the point in the hashring
hash int

// identity of the member that owns this replicaPoint.
identity string

// address of the member that owns this replicaPoint.
address string
}
Expand All @@ -70,7 +70,17 @@ type HashRing struct {

serverSet map[string]struct{}
tree *redBlackTree
checksum uint32

// legacyChecksum is the old checksum that is calculated only from the
// identities being added.
legacyChecksum uint32

// checksummers is map of named Checksum calculators for the hashring
checksummers map[string]Checksummer
// checksums is a map containing the checksums that are representing this
// hashring. The map should never be altered in place so it is safe to pass
// a copy to components that need the checksums
checksums map[string]uint32

logger bark.Logger
}
Expand All @@ -83,6 +93,10 @@ func New(hashfunc func([]byte) uint32, replicaPoints int) *HashRing {
return int(hashfunc([]byte(str)))
},
logger: logging.Logger("ring"),

checksummers: map[string]Checksummer{
"replica": &replicaPointChecksummer{},
},
}

r.serverSet = make(map[string]struct{})
Expand All @@ -92,40 +106,74 @@ func New(hashfunc func([]byte) uint32, replicaPoints int) *HashRing {

// Checksum returns the checksum of all stored servers in the HashRing
// Use this value to find out if the HashRing is mutated.
func (r *HashRing) Checksum() uint32 {
func (r *HashRing) Checksum() (checksum uint32) {
r.RLock()
checksum := r.checksum
checksum = r.legacyChecksum
r.RUnlock()
return checksum
return
}

// computeChecksum computes checksum of all servers in the ring.
// This function isn't thread-safe, only call it when the HashRing is locked.
func (r *HashRing) computeChecksumNoLock() {
addresses := r.copyServersNoLock()
sort.Strings(addresses)
bytes := []byte(strings.Join(addresses, ";"))
old := r.checksum
r.checksum = farm.Fingerprint32(bytes)

if r.checksum != old {
// Checksums returns a map of checksums named by the algorithm used to compute
// the checksum.
func (r *HashRing) Checksums() (checksums map[string]uint32) {
r.RLock()
// even though the map is immutable the pointer to it is not so it requires
// a readlock
checksums = r.checksums
r.RUnlock()
return
}

// computeChecksumsNoLock re-computes all configured checksums for this hashring
// and updates the in memory map with a new map containing the new checksums.
func (r *HashRing) computeChecksumsNoLock() {
oldChecksums := r.checksums

r.checksums = make(map[string]uint32)
changed := false
// calculate all configured checksums
for name, checksummer := range r.checksummers {
oldChecksum := oldChecksums[name]
newChecksum := checksummer.Checksum(r)
r.checksums[name] = newChecksum

if oldChecksum != newChecksum {
changed = true
}
}

// calculate the legacy identity only based checksum
legacyChecksummer := identityChecksummer{}
oldChecksum := r.legacyChecksum
newChecksum := legacyChecksummer.Checksum(r)
r.legacyChecksum = newChecksum

if oldChecksum != newChecksum {
changed = true
}

if changed {
r.logger.WithFields(bark.Fields{
"checksum": r.checksum,
"oldChecksum": old,
"checksum": r.legacyChecksum,
"oldChecksum": oldChecksum,
"checksums": r.checksums,
}).Debug("ringpop ring computed new checksum")
}

r.EmitEvent(events.RingChecksumEvent{
OldChecksum: old,
NewChecksum: r.checksum,
OldChecksum: oldChecksum,
NewChecksum: r.legacyChecksum,
OldChecksums: oldChecksums,
NewChecksums: r.checksums,
})
}

func (r *HashRing) replicaPointForServer(server membership.Member, replica int) replicaPoint {
replicaStr := fmt.Sprintf("%s%v", server.Identity(), replica)
return replicaPoint{
hash: r.hashfunc(replicaStr),
address: server.GetAddress(),
hash: r.hashfunc(replicaStr),
identity: server.Identity(),
address: server.GetAddress(),
}
}

Expand All @@ -142,7 +190,7 @@ func (r *HashRing) AddMembers(members ...membership.Member) bool {
}

if changed {
r.computeChecksumNoLock()
r.computeChecksumsNoLock()
r.EmitEvent(events.RingChangedEvent{
ServersAdded: added,
})
Expand Down Expand Up @@ -182,7 +230,7 @@ func (r *HashRing) RemoveMembers(members ...membership.Member) bool {
}

if changed {
r.computeChecksumNoLock()
r.computeChecksumsNoLock()
r.EmitEvent(events.RingChangedEvent{
ServersRemoved: removed,
})
Expand Down Expand Up @@ -240,7 +288,7 @@ func (r *HashRing) ProcessMembershipChanges(changes []membership.MemberChange) {

// recompute checksums on changes
if changed {
r.computeChecksumNoLock()
r.computeChecksumsNoLock()
r.EmitEvent(events.RingChangedEvent{
ServersAdded: added,
ServersUpdated: updated,
Expand Down
31 changes: 31 additions & 0 deletions hashring/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,37 @@ func TestChecksumChanges(t *testing.T) {
assert.NotEqual(t, checksum, ring.Checksum(), "expected checksum to have changed on server remove")
}

func TestHashRing_Checksums(t *testing.T) {
getSortedKeys := func(m map[string]uint32) []string {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}

ring := New(farm.Fingerprint32, 10)
ring.checksummers = map[string]Checksummer{
"identity": &identityChecksummer{},
"address": &addressChecksummer{},
"replicaPoint": &replicaPointChecksummer{},
}

checkSummers := []string{"identity", "address", "replicaPoint"}
sort.Strings(checkSummers)

ring.AddMembers(fakeMember{address: "server1"})
ring.AddMembers(fakeMember{address: "server2"})

checksums := ring.Checksums()
assert.Equal(t, checkSummers, getSortedKeys(checksums), "Expected all checksums to be computed")
ring.RemoveMembers(fakeMember{address: "server1"})

assert.NotEqual(t, checksums, ring.Checksums(), "expected checksums to have changed on server remove")
assert.Equal(t, checkSummers, getSortedKeys(checksums), "Expected all checksums to be computed")
}

func TestServerCount(t *testing.T) {
ring := New(farm.Fingerprint32, 10)
assert.Equal(t, 0, ring.ServerCount(), "expected one server to be in ring")
Expand Down
Loading

0 comments on commit 3ea45d9

Please sign in to comment.