Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Upgrade on input
Browse files Browse the repository at this point in the history
* Report as GOING_DOWN
* Start draining everything
* Fix a couple of bugs on controllerhost

Test manually by curl'ing the endpoint
  • Loading branch information
Aravind Srinivasan committed Apr 12, 2017
1 parent 6566802 commit ce98f19
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 17 deletions.
5 changes: 5 additions & 0 deletions common/convert.go
Expand Up @@ -206,6 +206,11 @@ func RolePtr(role controller.Role) *controller.Role {
return &role
}

// NodeStatusPtr makes a copy and returns the pointer to a NodeStatus.
func NodeStatusPtr(status controller.NodeStatus) *controller.NodeStatus {
return &status
}

// NodeMetricsPtr makes a copy and returns the pointer to
// a NodeMetrics.
func NodeMetricsPtr(nodeMetrics controller.NodeMetrics) *controller.NodeMetrics {
Expand Down
6 changes: 6 additions & 0 deletions common/loadreporter.go
Expand Up @@ -46,6 +46,7 @@ type (
// LoadReporterDaemon is used for periodically reporting load to controller
LoadReporterDaemon interface {
Daemon
GetReporter() LoadReporter
}

// LoadReporterDaemonFactory is used to create a daemon task for reporting load to controller
Expand Down Expand Up @@ -154,6 +155,11 @@ func (d *loadReporterDaemonImpl) Stop() {
d.logger.Info("Load reporter stopped.")
}

// GetReporter is used to get the underneath load reporter
func (d *loadReporterDaemonImpl) GetReporter() LoadReporter {
return d.reporter
}

func (d *loadReporterDaemonImpl) reporterPump() {

defer d.shutdownWG.Done()
Expand Down
6 changes: 6 additions & 0 deletions common/mockloadreporterdaemon.go
Expand Up @@ -40,3 +40,9 @@ func (m *MockLoadReporterDaemon) Stop() {
m.Called()
return
}

// GetReporter is the mock implementation for GetReporter function on common.LoadReporterDaemon
func (m *MockLoadReporterDaemon) GetReporter() LoadReporter {
m.Called()
return nil
}
2 changes: 1 addition & 1 deletion services/controllerhost/controllerhost.go
Expand Up @@ -555,7 +555,7 @@ func (mcp *Mcp) ReportNodeMetric(ctx thrift.Context, request *c.ReportNodeMetric
if metrics.IsSetOutgoingBytesCounter() {
loadMetrics.Put(hostID, load.EmptyTag, load.BytesOutPerSec, metrics.GetOutgoingBytesCounter(), timestamp)
}
if metrics.IsSetNodeStatus() && request.IsSetRole() {
if metrics.IsSetNodeStatus() && request.IsSetRole() && metrics.GetNodeStatus() == c.NodeStatus_GOING_DOWN {
switch request.GetRole() {
case c.Role_IN:
context.failureDetector.ReportHostGoingDown(common.InputServiceName, hostID)
Expand Down
4 changes: 4 additions & 0 deletions services/controllerhost/placement.go
Expand Up @@ -265,6 +265,10 @@ func (p *DistancePlacement) getHealthyHosts(service string) ([]*common.HostInfo,
}
result = append(result, h)
}

if len(result) == 0 {
return nil, errNoHosts
}
return result, nil
}

Expand Down
60 changes: 44 additions & 16 deletions services/inputhost/inputhost.go
Expand Up @@ -23,6 +23,7 @@ package inputhost
import (
"fmt"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -74,6 +75,12 @@ const (
// connWGTimeout is the timeout to wait after we send the drain command and
// before we start the drain
connWGTimeout = 5 * time.Second

// defaultUpgradeTimeout is the timeout to wait for upgrade
defaultUpgradeTimeout = 10 * time.Second

// drainAllUUID is the UUID used during drainAll
drainAllUUID = "D3A9C5DC-AE62-4465-9898-7FE71BD1FCA"
)

var (
Expand Down Expand Up @@ -103,7 +110,8 @@ type (
extMsgsLimitPerSecond int32
connMsgsLimitPerSecond int32
hostMetrics *load.HostMetrics
lastLoadReportedTime int64 // unix nanos when the last load report was sent
lastLoadReportedTime int64 // unix nanos when the last load report was sent
nodeStatus atomic.Value // status of the node
common.SCommon
}

Expand Down Expand Up @@ -845,15 +853,7 @@ func (h *InputHost) DrainExtent(ctx thrift.Context, request *admin.DrainExtentsR
return
}

// Report is the implementation for reporting host specific load to controller
func (h *InputHost) Report(reporter common.LoadReporter) {

now := time.Now().UnixNano()
intervalSecs := (now - h.lastLoadReportedTime) / int64(time.Second)
if intervalSecs < 1 {
return
}

func (h *InputHost) reportHostMetric(reporter common.LoadReporter, intervalSecs int64) int64 {
msgsInPerSec := h.hostMetrics.GetAndReset(load.HostMetricMsgsIn) / intervalSecs
// We just report the delta for the bytes in counter. so get the value and
// reset it.
Expand All @@ -864,13 +864,26 @@ func (h *InputHost) Report(reporter common.LoadReporter) {
NumberOfConnections: common.Int64Ptr(h.hostMetrics.Get(load.HostMetricNumOpenConns)),
IncomingMessagesCounter: common.Int64Ptr(msgsInPerSec),
IncomingBytesCounter: common.Int64Ptr(bytesInSinceLastReport),
NodeStatus: common.NodeStatusPtr(h.GetNodeStatus()),
}

reporter.ReportHostMetric(hostMetrics)
return *(hostMetrics.NumberOfConnections)
}

// Report is the implementation for reporting host specific load to controller
func (h *InputHost) Report(reporter common.LoadReporter) {

now := time.Now().UnixNano()
intervalSecs := (now - h.lastLoadReportedTime) / int64(time.Second)
if intervalSecs < 1 {
return
}

numConns := h.reportHostMetric(reporter, intervalSecs)
h.lastLoadReportedTime = now

// Also update the metrics reporter to make sure the connection gauge is incremented
numConns := *(hostMetrics.NumberOfConnections)
h.m3Client.UpdateGauge(metrics.PubConnectionStreamScope, metrics.InputhostPubConnection, numConns)
}

Expand Down Expand Up @@ -943,6 +956,16 @@ func (h *InputHost) GetNumConnections() int {
return int(h.hostMetrics.Get(load.HostMetricNumOpenConns))
}

// GetNodeStatus is the current status of this host
func (h *InputHost) GetNodeStatus() controller.NodeStatus {
return h.nodeStatus.Load().(controller.NodeStatus)
}

// SetNodeStatus sets the status of this host
func (h *InputHost) SetNodeStatus(status controller.NodeStatus) {
h.nodeStatus.Store(status)
}

// Shutdown shutsdown all the InputHost cleanly
func (h *InputHost) Shutdown() {
// make sure we have atleast loaded everything
Expand Down Expand Up @@ -983,11 +1006,15 @@ func (h *InputHost) RegisterWSHandler() *http.ServeMux {

// UpgradeHandler implements the upgrade end point
func (h *InputHost) UpgradeHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprintf(w, "Upgrade endpoint called on inputhost\n")
// perform upgrade here
// 1. Report the node as going down to controller
// 2. go drain everything in the pathCache
h.logger.Info("Upgrade endpoint called on inputhost")
h.SetNodeStatus(controller.NodeStatus_GOING_DOWN)
reporter := h.loadReporter.GetReporter()
// report as down
go h.reportHostMetric(reporter, 1)
// start draining everything
h.drainAll()
// at this point, we have marked ourself as down and drained everything. Exit since we are no longer useful
os.Exit(0)
}

// NewInputHost is the constructor for BIn
Expand Down Expand Up @@ -1026,6 +1053,7 @@ func NewInputHost(serviceName string, sVice common.SCommon, mClient metadata.TCh
// manage uconfig, regiester handerFunc and verifyFunc for uConfig values
bs.dConfigClient = sVice.GetDConfigClient()
bs.dynamicConfigManage()
bs.SetNodeStatus(controller.NodeStatus_UP)
return &bs, []thrift.TChanServer{cherami.NewTChanBInServer(&bs), admin.NewTChanInputHostAdminServer(&bs)}
}

Expand Down
15 changes: 15 additions & 0 deletions services/inputhost/inputhost_util.go
Expand Up @@ -21,6 +21,7 @@
package inputhost

import (
"sync"
"time"

"github.com/uber-common/bark"
Expand Down Expand Up @@ -209,6 +210,20 @@ func (h *InputHost) unloadAll() {
h.pathMutex.Unlock()
}

func (h *InputHost) drainAll() {
h.pathMutex.RLock()
defer h.pathMutex.RUnlock()
var drainWG sync.WaitGroup
for _, pathCache := range h.pathCache {
drainWG.Add(1)
go pathCache.drain(&drainWG)
}

if ok := common.AwaitWaitGroup(&drainWG, defaultUpgradeTimeout); !ok {
h.logger.Warn("inputhost: drain all timed out")
}
}

// updateExtTokenBucket update the token bucket for the extents msgs limit rate per second
func (h *InputHost) updateExtTokenBucket(connLimit int32) {
h.pathMutex.RLock()
Expand Down
12 changes: 12 additions & 0 deletions services/inputhost/pathCache.go
Expand Up @@ -565,3 +565,15 @@ func (pathCache *inPathCache) drainExtent(extUUID string, updateUUID string, dra
pathCache.RUnlock()
}
}

func (pathCache *inPathCache) drain(drainWG *sync.WaitGroup) {
defer drainWG.Done() // for this routine
pathCache.RLock()
defer pathCache.RUnlock()

// drain all extents
for extUUID := range pathCache.extentCache {
drainWG.Add(1) //for all the extents
go pathCache.drainExtent(string(extUUID), drainAllUUID, drainWG, defaultUpgradeTimeout)
}
}

0 comments on commit ce98f19

Please sign in to comment.