Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'md-mgr' of ssh://github.com/uber/cherami-server into md…
Browse files Browse the repository at this point in the history
…-mgr
  • Loading branch information
venkat1109 committed Jan 31, 2017
2 parents 7264041 + e797db5 commit 35f5d6d
Show file tree
Hide file tree
Showing 15 changed files with 736 additions and 75 deletions.
30 changes: 20 additions & 10 deletions clients/metadata/metadata_cassandra.go
Expand Up @@ -687,19 +687,19 @@ func (s *CassandraMetadataService) UpdateDestination(ctx thrift.Context, updateR
}

// Note: if we add a new updatable property here, we also need to update the metadataReconciler in replicator to do reconcilation
if updateRequest.Status == nil {
if !updateRequest.IsSetStatus() {
updateRequest.Status = common.InternalDestinationStatusPtr(existing.GetStatus())
}
if updateRequest.ConsumedMessagesRetention == nil {
if !updateRequest.IsSetConsumedMessagesRetention() {
updateRequest.ConsumedMessagesRetention = common.Int32Ptr(existing.GetConsumedMessagesRetention())
}
if updateRequest.UnconsumedMessagesRetention == nil {
if !updateRequest.IsSetUnconsumedMessagesRetention() {
updateRequest.UnconsumedMessagesRetention = common.Int32Ptr(existing.GetUnconsumedMessagesRetention())
}
if updateRequest.OwnerEmail == nil {
if !updateRequest.IsSetOwnerEmail() {
updateRequest.OwnerEmail = common.StringPtr(existing.GetOwnerEmail())
}
if updateRequest.ChecksumOption == nil {
if !updateRequest.IsSetChecksumOption() {
updateRequest.ChecksumOption = common.InternalChecksumOptionPtr(existing.GetChecksumOption())
}
batch := s.session.NewBatch(gocql.LoggedBatch) // Consider switching to unlogged
Expand Down Expand Up @@ -755,11 +755,21 @@ func (s *CassandraMetadataService) UpdateDestination(ctx thrift.Context, updateR
time.Now(),
marshalRequest(updateRequest))

existing.Status = common.InternalDestinationStatusPtr(updateRequest.GetStatus())
existing.ConsumedMessagesRetention = common.Int32Ptr(updateRequest.GetConsumedMessagesRetention())
existing.UnconsumedMessagesRetention = common.Int32Ptr(updateRequest.GetUnconsumedMessagesRetention())
existing.OwnerEmail = common.StringPtr(updateRequest.GetOwnerEmail())
existing.ChecksumOption = common.InternalChecksumOptionPtr(updateRequest.GetChecksumOption())
if updateRequest.IsSetStatus() {
existing.Status = common.InternalDestinationStatusPtr(updateRequest.GetStatus())
}
if updateRequest.IsSetConsumedMessagesRetention() {
existing.ConsumedMessagesRetention = common.Int32Ptr(updateRequest.GetConsumedMessagesRetention())
}
if updateRequest.IsSetUnconsumedMessagesRetention() {
existing.UnconsumedMessagesRetention = common.Int32Ptr(updateRequest.GetUnconsumedMessagesRetention())
}
if updateRequest.IsSetOwnerEmail() {
existing.OwnerEmail = common.StringPtr(updateRequest.GetOwnerEmail())
}
if updateRequest.IsSetChecksumOption() {
existing.ChecksumOption = common.InternalChecksumOptionPtr(updateRequest.GetChecksumOption())
}
return existing, nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/tools/cli/main.go
Expand Up @@ -53,7 +53,7 @@ func main() {
})
app.Name = "cherami"
app.Usage = "A command-line tool for cherami users"
app.Version = "1.1.5"
app.Version = "1.1.6"
app.Flags = []cli.Flag{
cli.BoolTFlag{
Name: "hyperbahn",
Expand Down
1 change: 0 additions & 1 deletion common/cache/lru.go
Expand Up @@ -63,7 +63,6 @@ func NewLRUWithInitialCapacity(initialCapacity, maxSize int) Cache {
return New(maxSize, &Options{
InitialCapacity: initialCapacity,
})

}

// Get retrieves the value stored under the given key
Expand Down
194 changes: 194 additions & 0 deletions common/metrics/testreporter.go
@@ -0,0 +1,194 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package metrics

import (
"fmt"
"sync"
"time"
)

type (
// TestReporter is the reporter used to dump metric to console for stress runs
TestReporter struct {
tags map[string]string
}

testStopWatch struct {
metricName string
reporter *TestReporter
startTime time.Time
elasped time.Duration
}
)

type HandlerFn func(metricName string, baseTags, tags map[string]string, value int64)

var handlers = make(map[string]map[string]HandlerFn) // Key1 - metricName; Key2 - "filterTag:filterVal"
var handlerMutex sync.RWMutex

// NewTestReporter create an instance of Reporter which can be used for driver to emit metric to console
func NewTestReporter(tags map[string]string) Reporter {
reporter := &TestReporter{
tags: make(map[string]string),
}

if tags != nil {
mergeDictoRight(tags, reporter.tags)
}

return reporter
}

// InitMetrics is used to initialize the metrics map with the respective type
func (r *TestReporter) InitMetrics(metricMap map[MetricName]MetricType) {
// This is a no-op for test reporter as it is already have a static list of metric to work with
}

// GetChildReporter creates the child reporter for this parent reporter
func (r *TestReporter) GetChildReporter(tags map[string]string) Reporter {

sr := &TestReporter{
tags: make(map[string]string),
}

// copy the parent tags as well
mergeDictoRight(r.GetTags(), sr.GetTags())

if tags != nil {
mergeDictoRight(tags, sr.tags)
}

return sr
}

// GetTags returns the tags for this reporter object
func (r *TestReporter) GetTags() map[string]string {
return r.tags
}

// IncCounter reports Counter metric to M3
func (r *TestReporter) IncCounter(name string, tags map[string]string, delta int64) {
r.executeHandler(name, tags, delta)
}

// UpdateGauge reports Gauge type metric
func (r *TestReporter) UpdateGauge(name string, tags map[string]string, value int64) {
r.executeHandler(name, tags, value)
}

func (r *TestReporter) executeHandler(name string, tags map[string]string, value int64) {
handlerMutex.RLock()
_, ok0 := handlers[``]
_, ok1 := handlers[name]
if ok0 || ok1 {
if allHandler2, ok2 := handlers[``][``]; ok2 { // Global handler
allHandler2(name, r.tags, tags, value)
}
if allHandler3, ok3 := handlers[name][``]; ok3 { // Handler for all metrics named 'name'
allHandler3(name, r.tags, tags, value)
}

// TODO: technically, this is wrong, as we don't have the local tags overriding the struct tags, but this
// has no practical effect in our current use of metrics, since we never override
for _, q := range []map[string]string{r.tags, tags} {
for filterTag, filterTagVal := range q {
key2 := filterTag + `:` + filterTagVal
if handler4, ok4 := handlers[``][key2]; ok4 { // Handler for this tag, any name
handler4(name, r.tags, tags, value)
}
if handler5, ok5 := handlers[name][key2]; ok5 { // Handler for specifically this name and tag
handler5(name, r.tags, tags, value)
}
}
}
}
handlerMutex.RUnlock()
}

// Register a handler (closure) that receives updates for a particular guage or counter based on the metric name and
// the name/value of one of the metric's tags. If the filterTag/Val are both empty, all updates to that metric will
// trigger the handler. If metricName is empty, all metrics matching the tag filter will pass through your function.
// A nil handler unregisters the handler for the given filter parameters
//
// Dev notes:
// * It is advisible to defer a call to unregister your handler when your test ends
// * Your handler can be called concurrently. Capture your own sync.Mutex if you must serialize
// * Counters report the delta; you must maintain the cumulative value of your counter if it is important
// * Your handler executes synchronously with the metrics code; DO NOT BLOCK
func RegisterHandler(metricName, filterTag, filterTagVal string, handler HandlerFn) {
defer handlerMutex.Unlock()
handlerMutex.Lock()
if _, ok := handlers[metricName]; !ok {
handlers[metricName] = make(map[string]HandlerFn)
}

key2 := filterTag + `:` + filterTagVal
if key2 == `:` {
key2 = ``
}

if handler == nil {
delete(handlers[metricName], key2)
if len(handlers[metricName]) == 0 {
delete(handlers, metricName)
}
return
}

if hf, ok2 := handlers[metricName][key2]; ok2 {
panic(fmt.Sprintf("Metrics handler %v (for '%s'/'%s') should have been unregistered", hf, metricName, key2))
}

handlers[metricName][key2] = handler
}

func newTestStopWatch(metricName string, reporter *TestReporter) *testStopWatch {
watch := &testStopWatch{
metricName: metricName,
reporter: reporter,
}

return watch
}

func (w *testStopWatch) Start() {
w.startTime = time.Now()
}

func (w *testStopWatch) Stop() time.Duration {
w.elasped = time.Since(w.startTime)

return w.elasped
}

// StartTimer returns a Stopwatch which when stopped will report the metric to M3
func (r *TestReporter) StartTimer(name string, tags map[string]string) Stopwatch {
w := newTestStopWatch(name, r)
w.Start()
return w
}

// RecordTimer should be used for measuring latency when you cannot start the stop watch.
func (r *TestReporter) RecordTimer(name string, tags map[string]string, d time.Duration) {
// Record the time as counter of time in milliseconds
// not implemented
}
23 changes: 23 additions & 0 deletions common/util.go
Expand Up @@ -563,6 +563,20 @@ func NewMetricReporterWithHostname(cfg configure.CommonServiceConfig) metrics.Re
return reporter
}

//NewTestMetricsReporter creates a test reporter that allows registration of handler functions
func NewTestMetricsReporter() metrics.Reporter {
hostName, e := os.Hostname()
lcLg := GetDefaultLogger()
if e != nil {
lcLg.WithFields(bark.Fields{TagErr: e}).Fatal("Error getting hostname")
}

reporter := metrics.NewTestReporter(map[string]string{
metrics.HostnameTagName: hostName,
})
return reporter
}

//GetLocalClusterInfo gets the zone and tenancy from the given deployment
func GetLocalClusterInfo(deployment string) (zone string, tenancy string) {
parts := strings.Split(deployment, "_")
Expand Down Expand Up @@ -599,6 +613,15 @@ func (r *cliHelper) GetDefaultOwnerEmail() string {
// GetCanonicalZone is the implementation of the corresponding method
func (r *cliHelper) GetCanonicalZone(zone string) (cZone string, err error) {
var ok bool
if len(zone) == 0 {
return "", errors.New("Invalid Zone Name")
}

// If canonical zone list is empty, then any zone is valid
if len(r.cZones) == 0 {
return zone, nil
}

if cZone, ok = r.cZones[zone]; !ok {
return "", errors.New("Invalid Zone Name")
}
Expand Down
20 changes: 15 additions & 5 deletions services/frontendhost/frontend.go
Expand Up @@ -238,11 +238,21 @@ func convertCreateDestRequestToInternal(createRequest *c.CreateDestinationReques
func convertUpdateDestRequestToInternal(updateRequest *c.UpdateDestinationRequest, destUUID string) *shared.UpdateDestinationRequest {
internalUpdateRequest := shared.NewUpdateDestinationRequest()
internalUpdateRequest.DestinationUUID = common.StringPtr(destUUID)
internalUpdateRequest.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus(updateRequest.GetStatus()))
internalUpdateRequest.ConsumedMessagesRetention = common.Int32Ptr(updateRequest.GetConsumedMessagesRetention())
internalUpdateRequest.UnconsumedMessagesRetention = common.Int32Ptr(updateRequest.GetUnconsumedMessagesRetention())
internalUpdateRequest.OwnerEmail = common.StringPtr(updateRequest.GetOwnerEmail())
internalUpdateRequest.ChecksumOption = common.InternalChecksumOptionPtr(shared.ChecksumOption(updateRequest.GetChecksumOption()))
if updateRequest.IsSetStatus() {
internalUpdateRequest.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus(updateRequest.GetStatus()))
}
if updateRequest.IsSetConsumedMessagesRetention() {
internalUpdateRequest.ConsumedMessagesRetention = common.Int32Ptr(updateRequest.GetConsumedMessagesRetention())
}
if updateRequest.IsSetUnconsumedMessagesRetention() {
internalUpdateRequest.UnconsumedMessagesRetention = common.Int32Ptr(updateRequest.GetUnconsumedMessagesRetention())
}
if updateRequest.IsSetOwnerEmail() {
internalUpdateRequest.OwnerEmail = common.StringPtr(updateRequest.GetOwnerEmail())
}
if updateRequest.IsSetChecksumOption() {
internalUpdateRequest.ChecksumOption = common.InternalChecksumOptionPtr(shared.ChecksumOption(updateRequest.GetChecksumOption()))
}
return internalUpdateRequest
}

Expand Down

0 comments on commit 35f5d6d

Please sign in to comment.