Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Add test hook to have empty and very short extents (#189)
Browse files Browse the repository at this point in the history
* Add test hook to have empty and very short extents

* Adjust short extent constant upward

* Add test log

* Add config init code

* Fix random branching
  • Loading branch information
Guillaume Bailey committed May 8, 2017
1 parent a96faf0 commit 49e4b82
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 7 deletions.
3 changes: 2 additions & 1 deletion common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,10 @@ func GetConnectionKey(host *cherami.HostAddress) string {
return fmt.Sprintf("%v:%d", host.GetHost(), host.GetPort())
}

// GetRandInt64 is used to get a 64 bit random number between min and max
// GetRandInt64 is used to get a 64 bit random number between min and max, inclusive
func GetRandInt64(min int64, max int64) int64 {
// we need to get a random number between min and max
max++ // Int63n returns a number in the range (0,n] (i.e. not inclusive)
return min + rand.Int63n(max-min)
}

Expand Down
11 changes: 11 additions & 0 deletions services/inputhost/dynamicConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
UkeyExtMsgs = "inputhost.HostPerExtentMsgsLimitPerSecond"
// UkeyConnMsgs is the uconfig key for HostPerConnMsgsLimitPerSecond
UkeyConnMsgs = "inputhost.HostPerConnMsgsLimitPerSecond"
// UkeyTestShortExts is the uconfig key for TestShortExtentsByPath
UkeyTestShortExts = "inputhost.TestShortExtentsByPath"
)

func (h *InputHost) registerInt() {
Expand All @@ -47,6 +49,7 @@ func (h *InputHost) registerInt() {
handlerMap[UkeyMaxConnPerDest] = dconfig.GenerateIntHandler(UkeyMaxConnPerDest, h.SetMaxConnPerDest, h.GetMaxConnPerDest)
handlerMap[UkeyExtMsgs] = dconfig.GenerateIntHandler(UkeyExtMsgs, h.SetExtMsgsLimitPerSecond, h.GetExtMsgsLimitPerSecond)
handlerMap[UkeyConnMsgs] = dconfig.GenerateIntHandler(UkeyConnMsgs, h.SetConnMsgsLimitPerSecond, h.GetConnMsgsLimitPerSecond)
handlerMap[UkeyTestShortExts] = dconfig.GenerateStringHandler(UkeyTestShortExts, h.SetTestShortExtentsByPath, h.GetTestShortExtentsByPath)
h.dConfigClient.AddHandlers(handlerMap)
// Add verify function for the dynamic config value
verifierMap := make(map[string]dconfig.Verifier)
Expand All @@ -68,6 +71,14 @@ func (h *InputHost) LoadUconfig() {
} else {
log.Errorf("Cannot get %s from uconfig, Using right format", UkeyHostOverall)
}

str, ok := h.dConfigClient.GetOrDefault(UkeyTestShortExts, ``).(string)
if ok {
h.SetTestShortExtentsByPath(str)
log.WithField(UkeyTestShortExts, str).Info(`Updated`)
} else {
log.WithField(UkeyTestShortExts, str).Warn(`Failed update, type assertion failed`)
}
}

// uconfigManage do the work for uconfig
Expand Down
66 changes: 61 additions & 5 deletions services/inputhost/exthost.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ import (

"github.com/pborman/uuid"
"github.com/uber-common/bark"
"github.com/uber/tchannel-go/thrift"

"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/services/inputhost/load"
"github.com/uber/cherami-thrift/.generated/go/admin"
"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-thrift/.generated/go/controller"
"github.com/uber/cherami-thrift/.generated/go/shared"
"github.com/uber/cherami-thrift/.generated/go/store"
"github.com/uber/tchannel-go/thrift"
)

type (
Expand Down Expand Up @@ -191,7 +190,48 @@ var (
msgAckTimeout = 1 * time.Minute
)

func newExtConnection(destUUID string, pathCache *inPathCache, extUUID string, numReplicas int, loadReporterFactory common.LoadReporterDaemonFactory, logger bark.Logger, tClients common.ClientFactory, shutdownWG *sync.WaitGroup, limitsEnabled bool) *extHost {
// getMaxSequenceNumberWithTestOverride calculates a random extent length, with or without a test override provided by dynamic configuration
func getMaxSequenceNumberWithTestOverride(pathCache *inPathCache, logger bark.Logger) int64 {
var shortExtents int64
override := pathCache.inputHost.GetTestShortExtentsByPath()
logFn := func() bark.Logger {
return logger
}

if override != `` {
shortExtents = common.OverrideValueByPrefix(
logFn,
pathCache.destinationPath,
strings.Split(override, `,`),
0,
`TestShortExtents`)
}

length := common.GetRandInt64(int64(extentRolloverSeqnumMin), int64(extentRolloverSeqnumMax)) // 50% chance with the test override, 100% with no test override
if shortExtents > 0 {
if common.GetRandInt64(0, 1) == 0 { // 50% chance
if common.GetRandInt64(0, 1) == 0 { // 50% chance
length = 0 // Empty extent with 25% probability with the test override
} else {
length = common.GetRandInt64(1, 20000) // 25% chance of very short extent with the test override
}
}
logger.WithField(`extentLength`, length).Info(`Overriding extent length for testing`)
}

return length
}

func newExtConnection(
destUUID string,
pathCache *inPathCache,
extUUID string,
numReplicas int,
loadReporterFactory common.LoadReporterDaemonFactory,
logger bark.Logger,
tClients common.ClientFactory,
shutdownWG *sync.WaitGroup,
limitsEnabled bool) *extHost {
conn := &extHost{
streams: make(map[storeHostPort]*replicaInfo),
extUUID: extUUID,
Expand All @@ -211,7 +251,7 @@ func newExtConnection(destUUID string, pathCache *inPathCache, extUUID string, n
shutdownWG: shutdownWG,
forceUnloadCh: make(chan struct{}),
limitsEnabled: limitsEnabled,
maxSequenceNumber: common.GetRandInt64(int64(extentRolloverSeqnumMin), int64(extentRolloverSeqnumMax)),
maxSequenceNumber: getMaxSequenceNumberWithTestOverride(pathCache, logger),
extMetrics: load.NewExtentMetrics(),
dstMetrics: pathCache.dstMetrics,
hostMetrics: pathCache.hostMetrics,
Expand Down Expand Up @@ -549,6 +589,7 @@ func (conn *extHost) writeAckToPubConn(putMsgAckCh chan *cherami.PutMessageAck,
}

func (conn *extHost) sendMessage(pr *inPutMessage, extSendTimer *common.Timer, watermark *int64) {
sequenceNumber, err := int64(0), error(nil)
// make sure we can satisfy the rate, if needed
if conn.limitsEnabled {
if ok, _ := conn.GetExtTokenBucketValue().TryConsume(1); !ok {
Expand Down Expand Up @@ -591,7 +632,22 @@ func (conn *extHost) sendMessage(pr *inPutMessage, extSendTimer *common.Timer, w
return
}

sequenceNumber, err := conn.sendMessageToReplicas(pr, extSendTimer, watermark)
if conn.maxSequenceNumber == 0 { // MaxSequenceNumber may be zero when we are testing short extents
conn.logger.Info(`inputhost: exthost: sealing and closing due to testing zero maxSequenceNumber`)
putMsgAck := &cherami.PutMessageAck{
ID: common.StringPtr(pr.putMsg.GetID()),
UserContext: pr.putMsg.GetUserContext(),
Status: common.CheramiStatusPtr(cherami.Status_FAILED),
Message: common.StringPtr(`test extent is randomly chosen to be empty`),
}
conn.writeAckToPubConn(pr.putMsgAckCh, putMsgAck)
go conn.sealExtent()
time.Sleep(time.Second) // Give the seal signal a moment to propagate
go conn.close()
return
}

sequenceNumber, err = conn.sendMessageToReplicas(pr, extSendTimer, watermark)
if err != nil {
// For now, lets reply Status_FAILED immediately and
// close the connection if we got an error.
Expand Down
13 changes: 12 additions & 1 deletion services/inputhost/inputhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
minimumAllowedMessageDelaySecondsTest = 3

// extentRolloverSeqnum{Min,Max} is the range within which extents would be sealed
// triggering a roll-over to a new exstent. the code picks a random number in this
// triggering a roll-over to a new extent. the code picks a random number in this
// range (currently, betweetn 10 million and 20 million) and will seal at this
// sequence number proactively so that we don't have a very large extent.
extentRolloverSeqnumMin, extentRolloverSeqnumMax = 10000000, 20000000
Expand Down Expand Up @@ -109,6 +109,7 @@ type (
hostMetrics *load.HostMetrics
lastLoadReportedTime int64 // unix nanos when the last load report was sent
nodeStatus atomic.Value // status of the node
testShortExtentsByPath string // Override to make some paths randomly have extremely short or zero-length extents
common.SCommon
}

Expand Down Expand Up @@ -963,6 +964,16 @@ func (h *InputHost) SetNodeStatus(status controller.NodeStatus) {
h.nodeStatus.Store(status)
}

// SetTestShortExtentsByPath sets path override that enables testing short extents
func (h *InputHost) SetTestShortExtentsByPath(override string) {
h.testShortExtentsByPath = override
}

// GetTestShortExtentsByPath gets path override that enables testing short extents
func (h *InputHost) GetTestShortExtentsByPath() (override string) {
return h.testShortExtentsByPath
}

// Shutdown shutsdown all the InputHost cleanly
func (h *InputHost) Shutdown() {
// make sure we have atleast loaded everything
Expand Down

0 comments on commit 49e4b82

Please sign in to comment.