Skip to content

Commit

Permalink
satellite/repair: use survivability model for segment health
Browse files Browse the repository at this point in the history
The chief segment health models we've come up with are the "immediate
danger" model and the "survivability" model. The former calculates the
chance of losing a segment becoming lost in the next time period (using
the CDF of the binomial distribution to estimate the chance of x nodes
failing in that period), while the latter estimates the number of
iterations for which a segment can be expected to survive (using the
mean of the negative binomial distribution). The immediate danger model
was a promising one for comparing segment health across segments with
different RS parameters, as it is more precisely what we want to
prevent, but it turns out that practically all segments in production
have infinite health, as the chance of losing segments with any
reasonable estimate of node failure rate is smaller than DBL_EPSILON,
the smallest possible difference from 1.0 representable in a float64
(about 1e-16).

Leaving aside the wisdom of worrying about the repair of segments that
have less than a 1e-16 chance of being lost, we want to be extremely
conservative and proactive in our repair efforts, and the health of the
segments we have been repairing thus far also evaluates to infinity
under the immediate danger model. Thus, we find ourselves reaching for
an alternative.

Dr. Ben saves the day: the survivability model is a reasonably close
approximation of the immediate danger model, and even better, it is
far simpler to calculate and yields manageable values for real-world
segments. The downside to it is that it requires as input an estimate
of the total number of active nodes.

This change replaces the segment health calculation to use the
survivability model, and reinstates the call to SegmentHealth() where it
was reverted. It gets estimates for the total number of active nodes by
leveraging the reliability cache.

Change-Id: Ia5d9b9031b9f6cf0fa7b9005a7011609415527dc
  • Loading branch information
thepaul authored and jenlij committed Dec 17, 2020
1 parent 3feee9f commit d3604a5
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 212 deletions.
70 changes: 52 additions & 18 deletions satellite/repair/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,28 @@ func (checker *Checker) Run(ctx context.Context) (err error) {
return group.Wait()
}

// getNodesEstimate updates the estimate of the total number of nodes. It is guaranteed
// to return a number greater than 0 when the error is nil.
//
// We can't calculate this upon first starting a Checker, because there may not be any
// nodes yet. We expect that there will be nodes before there are segments, though.
func (checker *Checker) getNodesEstimate(ctx context.Context) (int, error) {
// this should be safe to call frequently; it is an efficient caching lookup.
totalNumNodes, err := checker.nodestate.NumNodes(ctx)
if err != nil {
// We could proceed here by returning the last good value, or by returning a fallback
// constant estimate, like "20000", and we'd probably be fine, but it would be better
// not to have that happen silently for too long. Also, if we can't get this from the
// database, we probably can't modify the injured segments queue, so it won't help to
// proceed with this repair operation.
return 0, err
}
if totalNumNodes == 0 {
return 0, Error.New("segment health is meaningless: there are no nodes")
}
return totalNumNodes, nil
}

// RefreshReliabilityCache forces refreshing node online status cache.
func (checker *Checker) RefreshReliabilityCache(ctx context.Context) error {
return checker.nodestate.Refresh(ctx)
Expand All @@ -102,14 +124,15 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
startTime := time.Now()

observer := &checkerObserver{
repairQueue: checker.repairQueue,
irrdb: checker.irrdb,
nodestate: checker.nodestate,
statsCollector: checker.statsCollector,
monStats: aggregateStats{},
repairOverrides: checker.repairOverrides,
nodeFailureRate: checker.nodeFailureRate,
log: checker.logger,
repairQueue: checker.repairQueue,
irrdb: checker.irrdb,
nodestate: checker.nodestate,
statsCollector: checker.statsCollector,
monStats: aggregateStats{},
repairOverrides: checker.repairOverrides,
nodeFailureRate: checker.nodeFailureRate,
getNodesEstimate: checker.getNodesEstimate,
log: checker.logger,
}
err = checker.metaLoop.Join(ctx, observer)
if err != nil {
Expand Down Expand Up @@ -187,14 +210,19 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
repairThreshold = overrideValue
}

totalNumNodes, err := checker.getNodesEstimate(ctx)
if err != nil {
return Error.New("could not get estimate of total number of nodes: %w", err)
}

// we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to
// minimum required pieces in redundancy
// except for the case when the repair and success thresholds are the same (a case usually seen during testing)
//
// If the segment is suddenly entirely healthy again, we don't need to repair and we don't need to
// keep it in the irreparabledb queue either.
if numHealthy >= redundancy.MinReq && numHealthy <= repairThreshold && numHealthy < redundancy.SuccessThreshold {
segmentHealth := float64(numHealthy)
segmentHealth := repair.SegmentHealth(int(numHealthy), int(redundancy.MinReq), totalNumNodes, checker.nodeFailureRate)
_, err = checker.repairQueue.Insert(ctx, &internalpb.InjuredSegment{
Path: key,
LostPieces: missingPieces,
Expand Down Expand Up @@ -240,14 +268,15 @@ var _ metainfo.Observer = (*checkerObserver)(nil)
//
// architecture: Observer
type checkerObserver struct {
repairQueue queue.RepairQueue
irrdb irreparable.DB
nodestate *ReliabilityCache
statsCollector *statsCollector
monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this
repairOverrides RepairOverridesMap
nodeFailureRate float64
log *zap.Logger
repairQueue queue.RepairQueue
irrdb irreparable.DB
nodestate *ReliabilityCache
statsCollector *statsCollector
monStats aggregateStats // TODO(cam): once we verify statsCollector reports data correctly, remove this
repairOverrides RepairOverridesMap
nodeFailureRate float64
getNodesEstimate func(ctx context.Context) (int, error)
log *zap.Logger
}

func (obs *checkerObserver) getStatsByRS(redundancy storj.RedundancyScheme) *stats {
Expand Down Expand Up @@ -295,6 +324,11 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
}
}

totalNumNodes, err := obs.getNodesEstimate(ctx)
if err != nil {
return Error.New("could not get estimate of total number of nodes: %w", err)
}

// TODO: update MissingPieces to accept metabase.Pieces
missingPieces, err := obs.nodestate.MissingPieces(ctx, segment.CreationDate, pbPieces)
if err != nil {
Expand All @@ -315,7 +349,7 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo

required, repairThreshold, successThreshold, _ := obs.loadRedundancy(segment.Redundancy)

segmentHealth := repair.SegmentHealth(numHealthy, required, obs.nodeFailureRate)
segmentHealth := repair.SegmentHealth(numHealthy, required, totalNumNodes, obs.nodeFailureRate)
mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked
stats.segmentHealth.Observe(segmentHealth)

Expand Down
46 changes: 35 additions & 11 deletions satellite/repair/checker/online.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,49 @@ func NewReliabilityCache(overlay *overlay.Service, staleness time.Duration) *Rel
}
}

// LastUpdate returns when the cache was last updated.
// LastUpdate returns when the cache was last updated, or the zero value (time.Time{}) if it
// has never yet been updated. LastUpdate() does not trigger an update itself.
func (cache *ReliabilityCache) LastUpdate() time.Time {
if state, ok := cache.state.Load().(*reliabilityState); ok {
return state.created
}
return time.Time{}
}

// NumNodes returns the number of online active nodes (as determined by the reliability cache).
// This number is not guaranteed to be consistent with either the nodes database or the
// reliability cache after returning; it is just a best-effort count and should be treated as an
// estimate.
func (cache *ReliabilityCache) NumNodes(ctx context.Context) (numNodes int, err error) {
defer mon.Task()(&ctx)(&err)

state, err := cache.loadFast(ctx, time.Time{})
if err != nil {
return 0, err
}
return len(state.reliable), nil
}

// MissingPieces returns piece indices that are unreliable with the given staleness period.
func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.Time, pieces []*pb.RemotePiece) (_ []int32, err error) {
defer mon.Task()(&ctx)(&err)

state, err := cache.loadFast(ctx, created)
if err != nil {
return nil, err
}
var unreliable []int32
for _, piece := range pieces {
if _, ok := state.reliable[piece.NodeId]; !ok {
unreliable = append(unreliable, piece.PieceNum)
}
}
return unreliable, nil
}

func (cache *ReliabilityCache) loadFast(ctx context.Context, validUpTo time.Time) (_ *reliabilityState, err error) {
defer mon.Task()(&ctx)(&err)

// This code is designed to be very fast in the case where a refresh is not needed: just an
// atomic load from rarely written to bit of shared memory. The general strategy is to first
// read if the state suffices to answer the query. If not (due to it not existing, being
Expand All @@ -60,25 +91,18 @@ func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.T
// the acquisition. Only then do we refresh and can then proceed answering the query.

state, ok := cache.state.Load().(*reliabilityState)
if !ok || created.After(state.created) || time.Since(state.created) > cache.staleness {
if !ok || validUpTo.After(state.created) || time.Since(state.created) > cache.staleness {
cache.mu.Lock()
state, ok = cache.state.Load().(*reliabilityState)
if !ok || created.After(state.created) || time.Since(state.created) > cache.staleness {
if !ok || validUpTo.After(state.created) || time.Since(state.created) > cache.staleness {
state, err = cache.refreshLocked(ctx)
}
cache.mu.Unlock()
if err != nil {
return nil, err
}
}

var unreliable []int32
for _, piece := range pieces {
if _, ok := state.reliable[piece.NodeId]; !ok {
unreliable = append(unreliable, piece.PieceNum)
}
}
return unreliable, nil
return state, nil
}

// Refresh refreshes the cache.
Expand Down
187 changes: 48 additions & 139 deletions satellite/repair/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,144 +3,53 @@

package repair

import (
"math"
)

// SegmentHealth returns a value corresponding to the health of a segment
// in the repair queue. Lower health segments should be repaired first.
func SegmentHealth(numHealthy, minPieces int, failureRate float64) float64 {
return 1.0 / SegmentDanger(numHealthy, minPieces, failureRate)
import "math"

// SegmentHealth returns a value corresponding to the health of a segment in the
// repair queue. Lower health segments should be repaired first.
//
// This calculation purports to find the number of iterations for which a
// segment can be expected to survive, with the given failureRate. The number of
// iterations for the segment to survive (X) can be modeled with the negative
// binomial distribution, with the number of pieces that must be lost as the
// success threshold r, and the chance of losing a single piece in a round as
// the trial success probability p.
//
// First, we calculate the expected number of iterations for a segment to
// survive if we were to lose exactly one node every iteration:
//
// r = numHealthy - minPieces + 1
// p = (totalNodes - numHealthy) / totalNodes
// X ~ NB(r, p)
//
// Then we take the mean of that distribution to use as our expected value,
// which is pr/(1-p).
//
// Finally, to get away from the "one node per iteration" simplification, we
// just scale the magnitude of the iterations in the model so that there really
// is one node being lost. For example, if our failureRate and totalNodes imply
// a churn rate of 3 nodes per day, we just take 1/3 of a day and call that an
// "iteration" for purposes of the model. To convert iterations in the model to
// days, we divide the mean of the negative binomial distribution (X, above) by
// the number of nodes that we estimate will churn in one day.
func SegmentHealth(numHealthy, minPieces, totalNodes int, failureRate float64) float64 {
churnPerRound := float64(totalNodes) * failureRate
if churnPerRound < minChurnPerRound {
// we artificially limit churnPerRound from going too low in cases
// where there are not many nodes, so that health values do not
// start to approach the floating point maximum
churnPerRound = minChurnPerRound
}
p := float64(totalNodes-numHealthy) / float64(totalNodes)
if p == 1.0 {
// floating point precision is insufficient to represent the difference
// from p to 1. there are too many nodes for this model, or else
// numHealthy is 0 somehow. we can't proceed with the normal calculation
// or we will divide by zero.
return math.Inf(1)
}
mean1 := float64(numHealthy-minPieces+1) * p / (1 - p)
return mean1 / churnPerRound
}

// SegmentDanger returns the chance of a segment with the given minPieces
// and the given number of healthy pieces of being lost in the next time
// period.
//
// It assumes:
//
// * Nodes fail at the given failureRate (i.e., each node has a failureRate
// chance of going offline within the next time period).
// * Node failures are entirely independent. Obviously this is not the case,
// because many nodes may be operated by a single entity or share network
// infrastructure, in which case their failures would be correlated. But we
// can't easily model that, so our best hope is to try to avoid putting
// pieces for the same segment on related nodes to maximize failure
// independence.
//
// (The "time period" we are talking about here could be anything. The returned
// danger value will be given in terms of whatever time period was used to
// determine failureRate. If it simplifies things, you can think of the time
// period as "one repair worker iteration".)
//
// If those things are true, then the number of nodes holding this segment
// that will go offline follows the Binomial distribution:
//
// X ~ Binom(numHealthy, failureRate)
//
// A segment is lost if the number of nodes that go offline is higher than
// (numHealthy - minPieces). So we want to find
//
// Pr[X > (numHealthy - minPieces)]
//
// If we invert the logic here, we can use the standard CDF for the binomial
// distribution.
//
// Pr[X > (numHealthy - minPieces)] = 1 - Pr[X <= (numHealthy - minPieces)]
//
// And that gives us the danger value.
func SegmentDanger(numHealthy, minPieces int, failureRate float64) float64 {
return 1.0 - binomialCDF(float64(numHealthy-minPieces), float64(numHealthy), failureRate)
}

// math.Lgamma without the returned sign parameter; it's unneeded here.
func lnGamma(x float64) float64 {
lg, _ := math.Lgamma(x)
return lg
}

// The following functions are based on code from
// Numerical Recipes in C, Second Edition, Section 6.4 (pp. 227-228).

// betaI calculates the incomplete beta function I_x(a, b).
func betaI(a, b, x float64) float64 {
if x < 0.0 || x > 1.0 {
return math.NaN()
}
bt := 0.0
if x > 0.0 && x < 1.0 {
// factors in front of the continued function
bt = math.Exp(lnGamma(a+b) - lnGamma(a) - lnGamma(b) + a*math.Log(x) + b*math.Log(1.0-x))
}
if x < (a+1.0)/(a+b+2.0) {
// use continued fraction directly
return bt * betaCF(a, b, x) / a
}
// use continued fraction after making the symmetry transformation
return 1.0 - bt*betaCF(b, a, 1.0-x)/b
}

const (
// unlikely to go this far, as betaCF is expected to converge quickly for
// typical values.
maxIter = 100

// betaI outputs will be accurate to within this amount.
epsilon = 1.0e-14
)

// betaCF evaluates the continued fraction for the incomplete beta function
// by a modified Lentz's method.
func betaCF(a, b, x float64) float64 {
avoidZero := func(f float64) float64 {
if math.Abs(f) < math.SmallestNonzeroFloat64 {
return math.SmallestNonzeroFloat64
}
return f
}

qab := a + b
qap := a + 1.0
qam := a - 1.0
c := 1.0
d := 1.0 / avoidZero(1.0-qab*x/qap)
h := d

for m := 1; m <= maxIter; m++ {
m := float64(m)
m2 := 2.0 * m
aa := m * (b - m) * x / ((qam + m2) * (a + m2))
// one step (the even one) of the recurrence
d = 1.0 / avoidZero(1.0+aa*d)
c = avoidZero(1.0 + aa/c)
h *= d * c
aa = -(a + m) * (qab + m) * x / ((a + m2) * (qap + m2))
// next step of the recurrence (the odd one)
d = 1.0 / avoidZero(1.0+aa*d)
c = avoidZero(1.0 + aa/c)
del := d * c
h *= del
if math.Abs(del-1.0) < epsilon {
return h
}
}
// a or b too big, or maxIter too small
return math.NaN()
}

// binomialCDF evaluates the CDF of the binomial distribution Binom(n, p) at k.
// This is done using (1-p)**(n-k) when k is 0, or with the incomplete beta
// function otherwise.
func binomialCDF(k, n, p float64) float64 {
k = math.Floor(k)
if k < 0.0 || n < k {
return math.NaN()
}
if k == n {
return 1.0
}
if k == 0 {
return math.Pow(1.0-p, n-k)
}
return betaI(n-k, k+1.0, 1.0-p)
}
const minChurnPerRound = 1e-10

0 comments on commit d3604a5

Please sign in to comment.