Skip to content

Commit

Permalink
Merge branch 'master' into ss/sanitize
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyassrivatsan committed May 17, 2019
2 parents bec6257 + 8f12e6d commit bfc9b6a
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 75 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 1 addition & 5 deletions Gopkg.toml
Expand Up @@ -88,7 +88,7 @@ ignored = ["github.com/uber/cadence/.gen"]

[[constraint]]
name = "github.com/uber-go/tally"
version = "3.3.7"
version = "3.3.9"

[[constraint]]
name = "github.com/uber/ringpop-go"
Expand Down Expand Up @@ -150,7 +150,3 @@ ignored = ["github.com/uber/cadence/.gen"]
[[constraint]]
name = "github.com/valyala/fastjson"
version = "1.4.1"

[[override]]
name = "github.com/m3db/prometheus_client_golang"
revision = "8ae269d24972b8695572fa6b2e3718b5ea82d6b4"
66 changes: 2 additions & 64 deletions common/service/config/metrics.go
Expand Up @@ -21,13 +21,10 @@
package config

import (
"net/http"
"strings"
"time"

"github.com/cactus/go-statsd-client/statsd"
prom "github.com/m3db/prometheus_client_golang/prometheus"
"github.com/m3db/prometheus_client_golang/prometheus/promhttp"
"github.com/uber-go/tally"
"github.com/uber-go/tally/prometheus"
tallystatsdreporter "github.com/uber-go/tally/statsd"
Expand Down Expand Up @@ -121,9 +118,9 @@ func (c *Metrics) newStatsdScope(logger log.Logger) tally.Scope {
// newPrometheusScope returns a new prometheus scope with
// a default reporting interval of a second
func (c *Metrics) newPrometheusScope(logger log.Logger) tally.Scope {
reporter, err := NewPrometheusReporter(
c.Prometheus,
reporter, err := c.Prometheus.NewReporter(
prometheus.ConfigurationOptions{
Registry: prom.NewRegistry(),
OnError: func(err error) {
logger.Warn("error in prometheus reporter", tag.Error(err))
},
Expand All @@ -141,62 +138,3 @@ func (c *Metrics) newPrometheusScope(logger log.Logger) tally.Scope {
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
return scope
}

// NewPrometheusReporter - creates a prometheus reporter
// N.B - copy of the NewReporter method in tally - https://github.com/uber-go/tally/blob/master/prometheus/config.go#L77
// as the above method does not allow setting a separate registry per root
// which is necessary when we are running multiple roles within a same process
func NewPrometheusReporter(
config *prometheus.Configuration,
configOpts prometheus.ConfigurationOptions,
) (prometheus.Reporter, error) {
var opts prometheus.Options
opts.OnRegisterError = configOpts.OnError

switch config.TimerType {
case "summary":
opts.DefaultTimerType = prometheus.SummaryTimerType
case "histogram":
opts.DefaultTimerType = prometheus.HistogramTimerType
}

if len(config.DefaultHistogramBuckets) > 0 {
var values []float64
for _, value := range config.DefaultHistogramBuckets {
values = append(values, value.Upper)
}
opts.DefaultHistogramBuckets = values
}

if len(config.DefaultSummaryObjectives) > 0 {
values := make(map[float64]float64)
for _, value := range config.DefaultSummaryObjectives {
values[value.Percentile] = value.AllowedError
}
opts.DefaultSummaryObjectives = values
}

registry := prom.NewRegistry()
opts.Registerer = registry

reporter := prometheus.NewReporter(opts)

path := "/metrics"
if handlerPath := strings.TrimSpace(config.HandlerPath); handlerPath != "" {
path = handlerPath
}

if addr := strings.TrimSpace(config.ListenAddress); addr == "" {
http.Handle(path, promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
} else {
mux := http.NewServeMux()
mux.Handle(path, promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
go func() {
if err := http.ListenAndServe(addr, mux); err != nil {
configOpts.OnError(err)
}
}()
}

return reporter, nil
}
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Expand Up @@ -187,6 +187,7 @@ var keys = map[Key]string{
WorkerArchiverConcurrency: "worker.ArchiverConcurrency",
WorkerArchivalsPerIteration: "worker.ArchivalsPerIteration",
WorkerDeterministicConstructionCheckProbability: "worker.DeterministicConstructionCheckProbability",
WorkerTimeLimitPerArchivalIteration: "worker.TimeLimitPerArchivalIteration",
WorkerThrottledLogRPS: "worker.throttledLogRPS",
ScannerPersistenceMaxQPS: "worker.scannerPersistenceMaxQPS",
}
Expand Down Expand Up @@ -482,6 +483,8 @@ const (
WorkerArchivalsPerIteration
// WorkerDeterministicConstructionCheckProbability controls the probability of running a deterministic construction check for any given archival
WorkerDeterministicConstructionCheckProbability
// WorkerTimeLimitPerArchivalIteration controls the time limit of each iteration of archival workflow
WorkerTimeLimitPerArchivalIteration
// WorkerThrottledLogRPS is the rate limit on number of log messages emitted per second for throttled logger
WorkerThrottledLogRPS
// ScannerPersistenceMaxQPS is the maximum rate of persistence calls from worker.Scanner
Expand Down
1 change: 1 addition & 0 deletions service/worker/archiver/client_worker.go
Expand Up @@ -73,6 +73,7 @@ type (
ArchiverConcurrency dynamicconfig.IntPropertyFn
ArchivalsPerIteration dynamicconfig.IntPropertyFn
DeterministicConstructionCheckProbability dynamicconfig.FloatPropertyFn
TimeLimitPerArchivalIteration dynamicconfig.DurationPropertyFn
}

contextKey int
Expand Down
6 changes: 6 additions & 0 deletions service/worker/archiver/util.go
Expand Up @@ -28,6 +28,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/dgryski/go-farm"
"github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -111,6 +112,11 @@ func IsLast(tags map[string]string) bool {
return ok && last == "true"
}

// MaxArchivalIterationTimeout returns the max allowed timeout for a single iteration of archival workflow
func MaxArchivalIterationTimeout() time.Duration {
return workflowStartToCloseTimeout / 2
}

func modifyBlobForConstCheck(historyBlob *HistoryBlob, existingTags map[string]string) {
historyBlob.Header.UploadCluster = common.StringPtr(existingTags["upload_cluster"])
historyBlob.Header.UploadDateTime = common.StringPtr(existingTags["upload_date_time"])
Expand Down
11 changes: 10 additions & 1 deletion service/worker/archiver/workflow.go
Expand Up @@ -21,6 +21,8 @@
package archiver

import (
"time"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
Expand All @@ -31,6 +33,7 @@ import (
type dynamicConfigResult struct {
ArchiverConcurrency int
ArchivalsPerIteration int
TimelimitPerIteration time.Duration
}

func archivalWorkflow(ctx workflow.Context, carryover []ArchiveRequest) error {
Expand Down Expand Up @@ -62,9 +65,15 @@ func archivalWorkflowHelper(
_ = workflow.SideEffect(
ctx,
func(ctx workflow.Context) interface{} {
timeLimit := config.TimeLimitPerArchivalIteration()
maxTimeLimit := MaxArchivalIterationTimeout()
if timeLimit > maxTimeLimit {
timeLimit = maxTimeLimit
}
return dynamicConfigResult{
ArchiverConcurrency: config.ArchiverConcurrency(),
ArchivalsPerIteration: config.ArchivalsPerIteration(),
TimelimitPerIteration: timeLimit,
}
}).Get(&dcResult)
requestCh := workflow.NewBufferedChannel(ctx, dcResult.ArchivalsPerIteration)
Expand All @@ -75,7 +84,7 @@ func archivalWorkflowHelper(
archiver.Start()
signalCh := workflow.GetSignalChannel(ctx, signalName)
if pump == nil {
pump = NewPump(ctx, logger, metricsClient, carryover, workflowStartToCloseTimeout/2, dcResult.ArchivalsPerIteration, requestCh, signalCh)
pump = NewPump(ctx, logger, metricsClient, carryover, dcResult.TimelimitPerIteration, dcResult.ArchivalsPerIteration, requestCh, signalCh)
}
pumpResult := pump.Run()
metricsClient.AddCounter(metrics.ArchiverArchivalWorkflowScope, metrics.ArchiverNumPumpedRequestsCount, int64(len(pumpResult.PumpedHashes)))
Expand Down
5 changes: 3 additions & 2 deletions service/worker/archiver/workflow_test.go
Expand Up @@ -63,8 +63,9 @@ func (s *workflowSuite) SetupTest() {
workflowTestArchiver = &MockArchiver{}
workflowTestPump = &PumpMock{}
workflowTestConfig = &Config{
ArchiverConcurrency: dynamicconfig.GetIntPropertyFn(0),
ArchivalsPerIteration: dynamicconfig.GetIntPropertyFn(0),
ArchiverConcurrency: dynamicconfig.GetIntPropertyFn(0),
ArchivalsPerIteration: dynamicconfig.GetIntPropertyFn(0),
TimeLimitPerArchivalIteration: dynamicconfig.GetDurationPropertyFn(MaxArchivalIterationTimeout()),
}
}

Expand Down
1 change: 1 addition & 0 deletions service/worker/service.go
Expand Up @@ -102,6 +102,7 @@ func NewConfig(params *service.BootstrapParams) *Config {
ArchiverConcurrency: dc.GetIntProperty(dynamicconfig.WorkerArchiverConcurrency, 50),
ArchivalsPerIteration: dc.GetIntProperty(dynamicconfig.WorkerArchivalsPerIteration, 1000),
DeterministicConstructionCheckProbability: dc.GetFloat64Property(dynamicconfig.WorkerDeterministicConstructionCheckProbability, 0.002),
TimeLimitPerArchivalIteration: dc.GetDurationProperty(dynamicconfig.WorkerTimeLimitPerArchivalIteration, archiver.MaxArchivalIterationTimeout()),
},
IndexerCfg: &indexer.Config{
IndexerConcurrency: dc.GetIntProperty(dynamicconfig.WorkerIndexerConcurrency, 1000),
Expand Down

0 comments on commit bfc9b6a

Please sign in to comment.