Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' into srLRU
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Bailey committed Feb 3, 2017
2 parents 3b42355 + 73e2d29 commit cd0d751
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 127 deletions.
61 changes: 44 additions & 17 deletions common/rpm.go
Expand Up @@ -22,6 +22,7 @@ package common

import (
"errors"
"hash/fnv"
"math/rand"
"net"
"sort"
Expand Down Expand Up @@ -245,27 +246,33 @@ func (rpm *ringpopMonitorImpl) FindHostForAddr(service string, addr string) (*Ho

// FindHostForKey finds and returns the host responsible for handling the given key
func (rpm *ringpopMonitorImpl) FindHostForKey(service string, key string) (*HostInfo, error) {
// Note: we don't have Lookup(key, predicate..)
// so just get all reachable hosts return the first instance

// this function should be consistent in picking and returning the
// same node for a given key on a specific 'set' of hosts. the list
// of hosts returned by rpm.GetHosts is guaranteed to be sorted by
// ip-address (it is re-sorted on every 'refresh');, so we simply
// hash the key and use it to pick the host from the list.

// compute FNV-1a hash of the key
fnv1a := fnv.New32a()
fnv1a.Write([]byte(key))
hash := int(fnv1a.Sum32())

// get list of hosts that for the given service
members, err := rpm.GetHosts(service)
if err != nil || members == nil || len(members) == 0 {
if err != nil {
return nil, err
}

if members == nil || len(members) == 0 {
return nil, ErrUnknownService
}

// We need to make sure we always return the same node
// So sort members by IP and return the lowest.
var ipAddrs []string
tempMap := make(map[string]int, len(members))
for k, member := range members {
ipAddrs = append(ipAddrs, member.Addr)
// maintain the reverse mapping to have the index
tempMap[member.Addr] = k
}
// sort the ip addresses
sort.Strings(ipAddrs)
// get the key of the lowest
lowestIndex := tempMap[ipAddrs[0]]
return members[lowestIndex], nil
// pick the host corresponding to the hash and get a pointer to
// a copy of the HostInfo, since it is treated as immutable.
var host = *members[hash%len(members)]

return &host, nil
}

// IsHostHealthy returns true if the given host is healthy and false otherwise
Expand Down Expand Up @@ -404,6 +411,21 @@ func (rpm *ringpopMonitorImpl) refreshAll() {
}
}

// sort.Interface methods to help sort HostInfo by IP-address
type hostInfoByAddr []*HostInfo

func (t hostInfoByAddr) Len() int {
return len(t)
}

func (t hostInfoByAddr) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}

func (t hostInfoByAddr) Less(i, j int) bool {
return t[i].Addr < t[j].Addr
}

func (rpm *ringpopMonitorImpl) refresh(service string, currInfo *membershipInfo) (added, removed, updated []*HostInfo, newInfo *membershipInfo) {

added = make([]*HostInfo, 0, 2)
Expand Down Expand Up @@ -500,6 +522,11 @@ func (rpm *ringpopMonitorImpl) refresh(service string, currInfo *membershipInfo)
newInfo.asList[i] = v
i++
}

// Whenever the list changes, keep the list sorted by ip-address, so
// functions like FindHostForKey don't need to sort them every time.
sort.Sort(hostInfoByAddr(newInfo.asList))

return
}

Expand Down
23 changes: 11 additions & 12 deletions services/controllerhost/api_handlers.go
Expand Up @@ -80,7 +80,7 @@ func isUUIDLengthValid(uuid string) bool {
return len(uuid) == common.UUIDStringLength
}

func isInputHealthy(context *Context, extent *shared.Extent) bool {
func isInputHealthy(context *Context, extent *m.DestinationExtent) bool {
return context.rpm.IsHostHealthy(common.InputServiceName, extent.GetInputHostUUID())
}

Expand All @@ -104,7 +104,7 @@ func isAnyStoreHealthy(context *Context, storeIDs []string) bool {
return false
}

func areExtentStoresHealthy(context *Context, extent *shared.Extent) bool {
func areExtentStoresHealthy(context *Context, extent *m.DestinationExtent) bool {
for _, h := range extent.GetStoreUUIDs() {
if !context.rpm.IsHostHealthy(common.StoreServiceName, h) {
context.log.WithFields(bark.Fields{
Expand Down Expand Up @@ -165,8 +165,8 @@ func validateDstStatus(dstDesc *shared.DestinationDescription) error {
}
}

func listConsumerGroupExtents(context *Context, dstUUID string, cgUUID string, m3Scope int, filterByStatus []m.ConsumerGroupExtentStatus) ([]*m.ConsumerGroupExtent, error) {
cgExtents, err := context.mm.ListExtentsByConsumerGroup(dstUUID, cgUUID, filterByStatus)
func listConsumerGroupExtents(context *Context, dstUUID string, cgUUID string, m3Scope int, filterByStatus []m.ConsumerGroupExtentStatus) ([]*m.ConsumerGroupExtentLite, error) {
cgExtents, err := context.mm.ListExtentsByConsumerGroupLite(dstUUID, cgUUID, filterByStatus)
if err != nil {
context.m3Client.IncCounter(m3Scope, metrics.ControllerErrMetadataReadCounter)
}
Expand Down Expand Up @@ -196,14 +196,14 @@ func readDestination(context *Context, dstID string, m3Scope int) (*shared.Desti
return dstDesc, err
}

func findOpenExtents(context *Context, dstID string, m3Scope int) ([]*shared.ExtentStats, error) {
func findOpenExtents(context *Context, dstID string, m3Scope int) ([]*m.DestinationExtent, error) {
filterBy := []shared.ExtentStatus{shared.ExtentStatus_OPEN}
extentStats, err := context.mm.ListExtentsByDstIDStatus(dstID, filterBy)
extents, err := context.mm.ListDestinationExtentsByStatus(dstID, filterBy)
if err != nil {
context.m3Client.IncCounter(m3Scope, metrics.ControllerErrMetadataReadCounter)
return nil, err
}
return extentStats, err
return extents, err
}

func getDstType(desc *shared.DestinationDescription) dstType {
Expand Down Expand Up @@ -247,7 +247,7 @@ func minOpenExtentsForDst(context *Context, dstPath string, dstType dstType) int
return int(common.OverrideValueByPrefix(logFn, dstPath, cfg.NumPublisherExtentsByPath, defaultMinOpenPublishExtents, `NumPublisherExtentsByPath`))
}

func getInputAddrIfExtentIsWritable(context *Context, extent *shared.Extent, m3Scope int) (string, error) {
func getInputAddrIfExtentIsWritable(context *Context, extent *m.DestinationExtent, m3Scope int) (string, error) {
inputhost, err := context.rpm.ResolveUUID(common.InputServiceName, extent.GetInputHostUUID())
if err != nil {
context.log.
Expand Down Expand Up @@ -356,7 +356,7 @@ func refreshInputHostsForDst(context *Context, dstUUID string, now int64) ([]str
var minOpenExtents = minOpenExtentsForDst(context, dstDesc.GetPath(), dstType)
var isMultiZoneDest = dstDesc.GetIsMultiZone()

openExtentStats, err := findOpenExtents(context, dstUUID, m3Scope)
openExtents, err := findOpenExtents(context, dstUUID, m3Scope)
if err != nil {
// we can't get the metadata, let's
// continue to use the cached result
Expand All @@ -368,10 +368,9 @@ func refreshInputHostsForDst(context *Context, dstUUID string, now int64) ([]str
}

var nHealthy = 0
var inputHosts = make(map[string]*common.HostInfo, len(openExtentStats))
var inputHosts = make(map[string]*common.HostInfo, len(openExtents))

for _, stat := range openExtentStats {
ext := stat.GetExtent()
for _, ext := range openExtents {

// skip remote zone extent(read only)
if common.IsRemoteZoneExtent(ext.GetOriginZone(), context.localZone) {
Expand Down
51 changes: 25 additions & 26 deletions services/controllerhost/consumer.go
Expand Up @@ -45,7 +45,7 @@ type cgExtentsByCategory struct {
open map[string]struct{}
openHealthy map[string]struct{}
consumed map[string]struct{}
openBad []*m.ConsumerGroupExtent
openBad []*m.ConsumerGroupExtentLite
}

func validatCGStatus(cgDesc *shared.ConsumerGroupDescription) error {
Expand All @@ -64,7 +64,7 @@ func newCGExtentsByCategory() *cgExtentsByCategory {
open: make(map[string]struct{}),
openHealthy: make(map[string]struct{}),
consumed: make(map[string]struct{}),
openBad: make([]*m.ConsumerGroupExtent, 0),
openBad: make([]*m.ConsumerGroupExtentLite, 0),
}
}

Expand Down Expand Up @@ -143,8 +143,7 @@ func pickOutputHostForStoreHosts(context *Context, storeUUIDs []string) (*common
return context.placement.PickOutputHost(storeHosts)
}

func canConsumeDstExtent(context *Context, stat *shared.ExtentStats, consumedCGExtents map[string]struct{}) bool {
ext := stat.GetExtent()
func canConsumeDstExtent(context *Context, ext *m.DestinationExtent, consumedCGExtents map[string]struct{}) bool {
extID := ext.GetExtentUUID()
if _, ok := consumedCGExtents[extID]; ok {
return false
Expand All @@ -155,13 +154,13 @@ func canConsumeDstExtent(context *Context, stat *shared.ExtentStats, consumedCGE
return true
}

func reassignOutHost(context *Context, dstUUID string, extent *m.ConsumerGroupExtent, m3Scope int) *common.HostInfo {
func reassignOutHost(context *Context, dstUUID string, cgUUID string, extent *m.ConsumerGroupExtentLite, m3Scope int) *common.HostInfo {
outhost, err := pickOutputHostForStoreHosts(context, extent.GetStoreUUIDs())
if err != nil {
context.m3Client.IncCounter(m3Scope, metrics.ControllerErrPickOutHostCounter)
return nil
}
err = context.mm.UpdateOutHost(dstUUID, extent.GetConsumerGroupUUID(), extent.GetExtentUUID(), outhost.UUID)
err = context.mm.UpdateOutHost(dstUUID, cgUUID, extent.GetExtentUUID(), outhost.UUID)
if err != nil {
context.m3Client.IncCounter(m3Scope, metrics.ControllerErrMetadataUpdateCounter)
context.log.WithField(common.TagErr, err).Debug("Failed to update outhost for consumer group")
Expand All @@ -172,7 +171,7 @@ func reassignOutHost(context *Context, dstUUID string, extent *m.ConsumerGroupEx
common.TagDst: common.FmtDst(dstUUID),
common.TagExt: common.FmtExt(extent.GetExtentUUID()),
common.TagOut: common.FmtOut(outhost.UUID),
common.TagCnsm: common.FmtCnsm(extent.GetConsumerGroupUUID()),
common.TagCnsm: common.FmtCnsm(cgUUID),
`oldOuthID`: common.FmtOut(extent.GetOutputHostUUID()),
}).Info("Reassigned output host")
return outhost
Expand Down Expand Up @@ -217,6 +216,7 @@ func notifyOutputHostsForConsumerGroup(context *Context, dstUUID, cgUUID, reason
func repairExtentsAndUpdateOutputHosts(
context *Context,
dstUUID string,
cgUUID string,
cgExtents *cgExtentsByCategory,
maxToRepair int,
outputHosts map[string]*common.HostInfo,
Expand All @@ -225,15 +225,15 @@ func repairExtentsAndUpdateOutputHosts(
nRepaired := 0
for i := 0; i < len(cgExtents.openBad); i++ {
toRepair := cgExtents.openBad[i]
outHost := reassignOutHost(context, dstUUID, toRepair, m3Scope)
outHost := reassignOutHost(context, dstUUID, cgUUID, toRepair, m3Scope)
if outHost != nil {
outputHosts[outHost.UUID] = outHost
event := NewOutputHostNotificationEvent(dstUUID, toRepair.GetConsumerGroupUUID(), outHost.UUID,
event := NewOutputHostNotificationEvent(dstUUID, cgUUID, outHost.UUID,
notifyExtentRepaired, toRepair.GetExtentUUID(), a.NotificationType_HOST)
if !context.eventPipeline.Add(event) {
context.log.WithFields(bark.Fields{
common.TagDst: common.FmtDst(dstUUID),
common.TagCnsm: common.FmtCnsm(toRepair.GetConsumerGroupUUID()),
common.TagCnsm: common.FmtCnsm(cgUUID),
common.TagOut: common.FmtOut(outHost.UUID),
}).Error("Dropping OutputHostNotificationEvent after repairing extent, event queue full")
}
Expand All @@ -248,7 +248,7 @@ func repairExtentsAndUpdateOutputHosts(
return nRepaired
}

func addExtentsToConsumerGroup(context *Context, dstUUID string, cgUUID string, newExtents []*shared.Extent, outputHosts map[string]*common.HostInfo, m3Scope int) int {
func addExtentsToConsumerGroup(context *Context, dstUUID string, cgUUID string, newExtents []*m.DestinationExtent, outputHosts map[string]*common.HostInfo, m3Scope int) int {
nAdded := 0

for _, ext := range newExtents {
Expand Down Expand Up @@ -361,30 +361,29 @@ func selectNextExtentsToConsume(
dstDesc *shared.DestinationDescription,
cgDesc *shared.ConsumerGroupDescription,
cgExtents *cgExtentsByCategory,
m3Scope int) ([]*shared.Extent, int, error) {
m3Scope int) ([]*m.DestinationExtent, int, error) {

dstID := dstDesc.GetDestinationUUID()
cgID := cgDesc.GetConsumerGroupUUID()

filterBy := []shared.ExtentStatus{shared.ExtentStatus_SEALED, shared.ExtentStatus_OPEN}
dstExtStats, err := context.mm.ListExtentsByDstIDStatus(dstID, filterBy)
dstExtents, err := context.mm.ListDestinationExtentsByStatus(dstID, filterBy)
if err != nil {
context.m3Client.IncCounter(m3Scope, metrics.ControllerErrMetadataReadCounter)
return []*shared.Extent{}, 0, err
return []*m.DestinationExtent{}, 0, err
}

dedupMap := make(map[string]struct{})

var nCGDlqExtents int
var dstDlqExtents []*shared.Extent
var dstDlqExtents []*m.DestinationExtent
dstExtentsCount := 0
dstExtentsByZone := make(map[string][]*shared.Extent)
dstExtentsByZone := make(map[string][]*m.DestinationExtent)

sortExtentStatsByTime(dstExtStats)
sortExtentStatsByTime(dstExtents)

for _, stat := range dstExtStats {
for _, ext := range dstExtents {

ext := stat.GetExtent()
extID := ext.GetExtentUUID()

if _, ok := dedupMap[extID]; ok {
Expand All @@ -393,11 +392,11 @@ func selectNextExtentsToConsume(

dedupMap[extID] = struct{}{}

if !canConsumeDstExtent(context, stat, cgExtents.consumed) {
if !canConsumeDstExtent(context, ext, cgExtents.consumed) {
continue
}

visibility := stat.GetConsumerGroupVisibility()
visibility := ext.GetConsumerGroupVisibility()

if _, ok := cgExtents.open[extID]; ok {
if len(visibility) > 0 {
Expand Down Expand Up @@ -439,9 +438,9 @@ func selectNextExtentsToConsume(
// we have a dlq extent available now (and there
// is none currently consumed). So pick the
// dlq extent and bail out
return []*shared.Extent{dstDlqExtents[0]}, nAvailable, nil
return []*m.DestinationExtent{dstDlqExtents[0]}, nAvailable, nil
}
return []*shared.Extent{}, nAvailable, nil
return []*m.DestinationExtent{}, nAvailable, nil
}

nZone := 0
Expand All @@ -450,7 +449,7 @@ func selectNextExtentsToConsume(
nDstDlqExtents := 0
remDstDlqExtents := len(dstDlqExtents)

result := make([]*shared.Extent, capacity)
result := make([]*m.DestinationExtent, capacity)

for i := 0; i < capacity; i++ {
if remDstDlqExtents > 0 {
Expand Down Expand Up @@ -522,7 +521,7 @@ func refreshCGExtents(context *Context,
for i := 0; i < len(storehosts); i++ {
storeids[i] = storehosts[i].UUID
}
ext := &shared.Extent{
ext := &m.DestinationExtent{
ExtentUUID: common.StringPtr(extentID),
StoreUUIDs: storeids,
}
Expand Down Expand Up @@ -615,7 +614,7 @@ func refreshOutputHostsForConsGroup(context *Context,

// repair unhealthy extents before making a decision on whether to create a new extent or not
if len(cgExtents.openBad) > 0 {
nRepaired := repairExtentsAndUpdateOutputHosts(context, dstID, cgExtents, maxExtentsToConsume, outputHosts, m3Scope)
nRepaired := repairExtentsAndUpdateOutputHosts(context, dstID, cgID, cgExtents, maxExtentsToConsume, outputHosts, m3Scope)
nConsumable += nRepaired
if nRepaired != len(cgExtents.openBad) && nConsumable > 0 {
// if we cannot repair all of the bad extents,
Expand Down

0 comments on commit cd0d751

Please sign in to comment.