diff --git a/clients/metadata/metadata_cassandra.go b/clients/metadata/metadata_cassandra.go index 7c6d900c..026eb934 100644 --- a/clients/metadata/metadata_cassandra.go +++ b/clients/metadata/metadata_cassandra.go @@ -687,19 +687,19 @@ func (s *CassandraMetadataService) UpdateDestination(ctx thrift.Context, updateR } // Note: if we add a new updatable property here, we also need to update the metadataReconciler in replicator to do reconcilation - if updateRequest.Status == nil { + if !updateRequest.IsSetStatus() { updateRequest.Status = common.InternalDestinationStatusPtr(existing.GetStatus()) } - if updateRequest.ConsumedMessagesRetention == nil { + if !updateRequest.IsSetConsumedMessagesRetention() { updateRequest.ConsumedMessagesRetention = common.Int32Ptr(existing.GetConsumedMessagesRetention()) } - if updateRequest.UnconsumedMessagesRetention == nil { + if !updateRequest.IsSetUnconsumedMessagesRetention() { updateRequest.UnconsumedMessagesRetention = common.Int32Ptr(existing.GetUnconsumedMessagesRetention()) } - if updateRequest.OwnerEmail == nil { + if !updateRequest.IsSetOwnerEmail() { updateRequest.OwnerEmail = common.StringPtr(existing.GetOwnerEmail()) } - if updateRequest.ChecksumOption == nil { + if !updateRequest.IsSetChecksumOption() { updateRequest.ChecksumOption = common.InternalChecksumOptionPtr(existing.GetChecksumOption()) } batch := s.session.NewBatch(gocql.LoggedBatch) // Consider switching to unlogged @@ -755,11 +755,21 @@ func (s *CassandraMetadataService) UpdateDestination(ctx thrift.Context, updateR time.Now(), marshalRequest(updateRequest)) - existing.Status = common.InternalDestinationStatusPtr(updateRequest.GetStatus()) - existing.ConsumedMessagesRetention = common.Int32Ptr(updateRequest.GetConsumedMessagesRetention()) - existing.UnconsumedMessagesRetention = common.Int32Ptr(updateRequest.GetUnconsumedMessagesRetention()) - existing.OwnerEmail = common.StringPtr(updateRequest.GetOwnerEmail()) - existing.ChecksumOption = common.InternalChecksumOptionPtr(updateRequest.GetChecksumOption()) + if updateRequest.IsSetStatus() { + existing.Status = common.InternalDestinationStatusPtr(updateRequest.GetStatus()) + } + if updateRequest.IsSetConsumedMessagesRetention() { + existing.ConsumedMessagesRetention = common.Int32Ptr(updateRequest.GetConsumedMessagesRetention()) + } + if updateRequest.IsSetUnconsumedMessagesRetention() { + existing.UnconsumedMessagesRetention = common.Int32Ptr(updateRequest.GetUnconsumedMessagesRetention()) + } + if updateRequest.IsSetOwnerEmail() { + existing.OwnerEmail = common.StringPtr(updateRequest.GetOwnerEmail()) + } + if updateRequest.IsSetChecksumOption() { + existing.ChecksumOption = common.InternalChecksumOptionPtr(updateRequest.GetChecksumOption()) + } return existing, nil } diff --git a/cmd/tools/cli/main.go b/cmd/tools/cli/main.go index 7247593d..8e86ba87 100644 --- a/cmd/tools/cli/main.go +++ b/cmd/tools/cli/main.go @@ -53,7 +53,7 @@ func main() { }) app.Name = "cherami" app.Usage = "A command-line tool for cherami users" - app.Version = "1.1.5" + app.Version = "1.1.6" app.Flags = []cli.Flag{ cli.BoolTFlag{ Name: "hyperbahn", diff --git a/common/cache/lru.go b/common/cache/lru.go index 31ab14dd..c5be766b 100644 --- a/common/cache/lru.go +++ b/common/cache/lru.go @@ -63,7 +63,6 @@ func NewLRUWithInitialCapacity(initialCapacity, maxSize int) Cache { return New(maxSize, &Options{ InitialCapacity: initialCapacity, }) - } // Get retrieves the value stored under the given key diff --git a/common/metrics/testreporter.go b/common/metrics/testreporter.go new file mode 100644 index 00000000..7ab9ca89 --- /dev/null +++ b/common/metrics/testreporter.go @@ -0,0 +1,194 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package metrics + +import ( + "fmt" + "sync" + "time" +) + +type ( + // TestReporter is the reporter used to dump metric to console for stress runs + TestReporter struct { + tags map[string]string + } + + testStopWatch struct { + metricName string + reporter *TestReporter + startTime time.Time + elasped time.Duration + } +) + +type HandlerFn func(metricName string, baseTags, tags map[string]string, value int64) + +var handlers = make(map[string]map[string]HandlerFn) // Key1 - metricName; Key2 - "filterTag:filterVal" +var handlerMutex sync.RWMutex + +// NewTestReporter create an instance of Reporter which can be used for driver to emit metric to console +func NewTestReporter(tags map[string]string) Reporter { + reporter := &TestReporter{ + tags: make(map[string]string), + } + + if tags != nil { + mergeDictoRight(tags, reporter.tags) + } + + return reporter +} + +// InitMetrics is used to initialize the metrics map with the respective type +func (r *TestReporter) InitMetrics(metricMap map[MetricName]MetricType) { + // This is a no-op for test reporter as it is already have a static list of metric to work with +} + +// GetChildReporter creates the child reporter for this parent reporter +func (r *TestReporter) GetChildReporter(tags map[string]string) Reporter { + + sr := &TestReporter{ + tags: make(map[string]string), + } + + // copy the parent tags as well + mergeDictoRight(r.GetTags(), sr.GetTags()) + + if tags != nil { + mergeDictoRight(tags, sr.tags) + } + + return sr +} + +// GetTags returns the tags for this reporter object +func (r *TestReporter) GetTags() map[string]string { + return r.tags +} + +// IncCounter reports Counter metric to M3 +func (r *TestReporter) IncCounter(name string, tags map[string]string, delta int64) { + r.executeHandler(name, tags, delta) +} + +// UpdateGauge reports Gauge type metric +func (r *TestReporter) UpdateGauge(name string, tags map[string]string, value int64) { + r.executeHandler(name, tags, value) +} + +func (r *TestReporter) executeHandler(name string, tags map[string]string, value int64) { + handlerMutex.RLock() + _, ok0 := handlers[``] + _, ok1 := handlers[name] + if ok0 || ok1 { + if allHandler2, ok2 := handlers[``][``]; ok2 { // Global handler + allHandler2(name, r.tags, tags, value) + } + if allHandler3, ok3 := handlers[name][``]; ok3 { // Handler for all metrics named 'name' + allHandler3(name, r.tags, tags, value) + } + + // TODO: technically, this is wrong, as we don't have the local tags overriding the struct tags, but this + // has no practical effect in our current use of metrics, since we never override + for _, q := range []map[string]string{r.tags, tags} { + for filterTag, filterTagVal := range q { + key2 := filterTag + `:` + filterTagVal + if handler4, ok4 := handlers[``][key2]; ok4 { // Handler for this tag, any name + handler4(name, r.tags, tags, value) + } + if handler5, ok5 := handlers[name][key2]; ok5 { // Handler for specifically this name and tag + handler5(name, r.tags, tags, value) + } + } + } + } + handlerMutex.RUnlock() +} + +// Register a handler (closure) that receives updates for a particular guage or counter based on the metric name and +// the name/value of one of the metric's tags. If the filterTag/Val are both empty, all updates to that metric will +// trigger the handler. If metricName is empty, all metrics matching the tag filter will pass through your function. +// A nil handler unregisters the handler for the given filter parameters +// +// Dev notes: +// * It is advisible to defer a call to unregister your handler when your test ends +// * Your handler can be called concurrently. Capture your own sync.Mutex if you must serialize +// * Counters report the delta; you must maintain the cumulative value of your counter if it is important +// * Your handler executes synchronously with the metrics code; DO NOT BLOCK +func RegisterHandler(metricName, filterTag, filterTagVal string, handler HandlerFn) { + defer handlerMutex.Unlock() + handlerMutex.Lock() + if _, ok := handlers[metricName]; !ok { + handlers[metricName] = make(map[string]HandlerFn) + } + + key2 := filterTag + `:` + filterTagVal + if key2 == `:` { + key2 = `` + } + + if handler == nil { + delete(handlers[metricName], key2) + if len(handlers[metricName]) == 0 { + delete(handlers, metricName) + } + return + } + + if hf, ok2 := handlers[metricName][key2]; ok2 { + panic(fmt.Sprintf("Metrics handler %v (for '%s'/'%s') should have been unregistered", hf, metricName, key2)) + } + + handlers[metricName][key2] = handler +} + +func newTestStopWatch(metricName string, reporter *TestReporter) *testStopWatch { + watch := &testStopWatch{ + metricName: metricName, + reporter: reporter, + } + + return watch +} + +func (w *testStopWatch) Start() { + w.startTime = time.Now() +} + +func (w *testStopWatch) Stop() time.Duration { + w.elasped = time.Since(w.startTime) + + return w.elasped +} + +// StartTimer returns a Stopwatch which when stopped will report the metric to M3 +func (r *TestReporter) StartTimer(name string, tags map[string]string) Stopwatch { + w := newTestStopWatch(name, r) + w.Start() + return w +} + +// RecordTimer should be used for measuring latency when you cannot start the stop watch. +func (r *TestReporter) RecordTimer(name string, tags map[string]string, d time.Duration) { + // Record the time as counter of time in milliseconds + // not implemented +} diff --git a/common/util.go b/common/util.go index 6655f192..a46e591b 100644 --- a/common/util.go +++ b/common/util.go @@ -563,6 +563,20 @@ func NewMetricReporterWithHostname(cfg configure.CommonServiceConfig) metrics.Re return reporter } +//NewTestMetricsReporter creates a test reporter that allows registration of handler functions +func NewTestMetricsReporter() metrics.Reporter { + hostName, e := os.Hostname() + lcLg := GetDefaultLogger() + if e != nil { + lcLg.WithFields(bark.Fields{TagErr: e}).Fatal("Error getting hostname") + } + + reporter := metrics.NewTestReporter(map[string]string{ + metrics.HostnameTagName: hostName, + }) + return reporter +} + //GetLocalClusterInfo gets the zone and tenancy from the given deployment func GetLocalClusterInfo(deployment string) (zone string, tenancy string) { parts := strings.Split(deployment, "_") @@ -599,6 +613,15 @@ func (r *cliHelper) GetDefaultOwnerEmail() string { // GetCanonicalZone is the implementation of the corresponding method func (r *cliHelper) GetCanonicalZone(zone string) (cZone string, err error) { var ok bool + if len(zone) == 0 { + return "", errors.New("Invalid Zone Name") + } + + // If canonical zone list is empty, then any zone is valid + if len(r.cZones) == 0 { + return zone, nil + } + if cZone, ok = r.cZones[zone]; !ok { return "", errors.New("Invalid Zone Name") } diff --git a/services/frontendhost/frontend.go b/services/frontendhost/frontend.go index ad1475e5..eb23ac32 100644 --- a/services/frontendhost/frontend.go +++ b/services/frontendhost/frontend.go @@ -238,11 +238,21 @@ func convertCreateDestRequestToInternal(createRequest *c.CreateDestinationReques func convertUpdateDestRequestToInternal(updateRequest *c.UpdateDestinationRequest, destUUID string) *shared.UpdateDestinationRequest { internalUpdateRequest := shared.NewUpdateDestinationRequest() internalUpdateRequest.DestinationUUID = common.StringPtr(destUUID) - internalUpdateRequest.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus(updateRequest.GetStatus())) - internalUpdateRequest.ConsumedMessagesRetention = common.Int32Ptr(updateRequest.GetConsumedMessagesRetention()) - internalUpdateRequest.UnconsumedMessagesRetention = common.Int32Ptr(updateRequest.GetUnconsumedMessagesRetention()) - internalUpdateRequest.OwnerEmail = common.StringPtr(updateRequest.GetOwnerEmail()) - internalUpdateRequest.ChecksumOption = common.InternalChecksumOptionPtr(shared.ChecksumOption(updateRequest.GetChecksumOption())) + if updateRequest.IsSetStatus() { + internalUpdateRequest.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus(updateRequest.GetStatus())) + } + if updateRequest.IsSetConsumedMessagesRetention() { + internalUpdateRequest.ConsumedMessagesRetention = common.Int32Ptr(updateRequest.GetConsumedMessagesRetention()) + } + if updateRequest.IsSetUnconsumedMessagesRetention() { + internalUpdateRequest.UnconsumedMessagesRetention = common.Int32Ptr(updateRequest.GetUnconsumedMessagesRetention()) + } + if updateRequest.IsSetOwnerEmail() { + internalUpdateRequest.OwnerEmail = common.StringPtr(updateRequest.GetOwnerEmail()) + } + if updateRequest.IsSetChecksumOption() { + internalUpdateRequest.ChecksumOption = common.InternalChecksumOptionPtr(shared.ChecksumOption(updateRequest.GetChecksumOption())) + } return internalUpdateRequest } diff --git a/services/outputhost/cgcache.go b/services/outputhost/cgcache.go index 7a8926d3..f611b31d 100644 --- a/services/outputhost/cgcache.go +++ b/services/outputhost/cgcache.go @@ -21,6 +21,7 @@ package outputhost import ( + "sync/atomic" "sync" "time" @@ -28,6 +29,7 @@ import ( "github.com/uber/tchannel-go/thrift" "github.com/uber/cherami-server/common" + "github.com/uber/cherami-server/common/cache" "github.com/uber/cherami-server/common/dconfig" "github.com/uber/cherami-server/common/metrics" "github.com/uber/cherami-server/services/outputhost/load" @@ -147,6 +149,12 @@ type ( // lastDisconnectTime is the time the last consumer got disconnected lastDisconnectTime time.Time + // dlqMerging indicates whether any DLQ extent is presently being merged. ATOMIC OPERATIONS ONLY + dlqMerging int32 + + // checkSingleCGVisibleCache reduces ListExtentStats calls + checkSingleCGVisibleCache cache.Cache // Key is the extent UUID, value is whether it is single-CG-visible + // sessionID is the 16 bit session identifier for this host sessionID uint16 @@ -256,6 +264,7 @@ func newConsumerGroupCache(destPath string, cgDesc shared.ConsumerGroupDescripti cfgMgr: h.cfgMgr, } + cgCache.checkSingleCGVisibleCache = cache.NewLRUWithInitialCapacity(1, 1 << 6) cgCache.consumerM3Client = metrics.NewClientWithTags(h.m3Client, metrics.Outputhost, cgCache.getConsumerGroupTags()) cgCache.loadReporter = cgCache.loadReporterFactory.CreateReporter(consGroupLoadReportingInterval, cgCache, cgLogger) cgCache.loadReporter.Start() @@ -287,7 +296,7 @@ func (cgCache *consumerGroupCache) getConsumerGroupTags() map[string]string { } // loadExtentCache loads the extent cache, if it doesn't already exist for this consumer group -func (cgCache *consumerGroupCache) loadExtentCache(destType shared.DestinationType, cge *metadata.ConsumerGroupExtent) (err error) { +func (cgCache *consumerGroupCache) loadExtentCache(destType shared.DestinationType, cge *metadata.ConsumerGroupExtent) { extUUID := cge.GetExtentUUID() extLogger := cgCache.logger.WithField(common.TagExt, extUUID) if extCache, exists := cgCache.extentCache[extUUID]; !exists { @@ -435,8 +444,8 @@ func (cgCache *consumerGroupCache) manageConsumerGroupCache() { } } -// refreshCgCache contacts metadata to get all the open extents and then -// loads them if needed +// refreshCgCache contacts metadata to get all the open extents and then loads them if needed +// Note that this function is and must be run under the cgCache.extMutex lock func (cgCache *consumerGroupCache) refreshCgCache(ctx thrift.Context) error { // First check any status changes to the destination or consumer group @@ -488,7 +497,7 @@ func (cgCache *consumerGroupCache) refreshCgCache(ctx thrift.Context) error { MaxResults: common.Int32Ptr(defaultMaxResults), } - var nExtents = 0 + var nSingleCGExtents = 0 for { cgRes, errRCGES := cgCache.metaClient.ReadConsumerGroupExtents(ctx, cgReq) @@ -500,11 +509,10 @@ func (cgCache *consumerGroupCache) refreshCgCache(ctx thrift.Context) error { // Now we have the cgCache. check and load extent for all extents for _, cge := range cgRes.GetExtents() { - nExtents++ - errR := cgCache.loadExtentCache(dstDesc.GetType(), cge) - if errR != nil { - return errR + if cgCache.checkSingleCGVisible(ctx, cge) { + nSingleCGExtents++ } + cgCache.loadExtentCache(dstDesc.GetType(), cge) } if len(cgRes.GetNextPageToken()) == 0 { @@ -514,9 +522,54 @@ func (cgCache *consumerGroupCache) refreshCgCache(ctx thrift.Context) error { } } + // DEVNOTE: The term 'merging' here is a reference to the DLQ Merge feature/operation. These extents are + // 'served' just like any other, but they are visible to this one consumer group, hence 'SingleCG' + newDLQMerging := int32(nSingleCGExtents) + oldDLQMerging := atomic.SwapInt32(&cgCache.dlqMerging, newDLQMerging) + if oldDLQMerging != newDLQMerging { + if newDLQMerging > 0 { + cgCache.logger.WithField(`nSingleCGExtents`, nSingleCGExtents).Info("Merging DLQ Extent(s)") + } else { + cgCache.logger.Info("Done merging DLQ Extent(s)") + } + } + return nil } + +// checkSingleCGVisible determines if an extent under a certain destination is visible only to one consumer group. It is a cached call, so the +// metadata client will only be invoked once per destination+extent +func (cgCache *consumerGroupCache) checkSingleCGVisible(ctx thrift.Context, cge *metadata.ConsumerGroupExtent) (singleCgVisible bool) { + // Note that this same extent can be loaded by either a consumer group of the DLQ destination (i.e. for inspection, + // with consumer group visibility = nil), or as an extent being merged into the original consumer group (i.e. for DLQ + // merge, with consumer group visibility = this CG). This is why this cache isn't global + key := cge.GetExtentUUID() + val := cgCache.checkSingleCGVisibleCache.Get(key) + if val != nil { + return val.(bool) + } + + req := &metadata.ReadExtentStatsRequest{ + DestinationUUID: common.StringPtr(cgCache.cachedCGDesc.GetDestinationUUID()), + ExtentUUID: common.StringPtr(cge.GetExtentUUID()), + } + ext, err := cgCache.metaClient.ReadExtentStats(ctx, req) + if err == nil { + if ext != nil && ext.GetExtentStats() != nil { + singleCgVisible = ext.GetExtentStats().GetConsumerGroupVisibility() != `` + if singleCgVisible { + cgCache.logger.WithFields(bark.Fields{common.TagExt: common.FmtExt(cge.GetExtentUUID())}).Info("Consuming single CG visible extent") + } + } + cgCache.checkSingleCGVisibleCache.Put(key, singleCgVisible) + } else { // Note that we don't cache val when we have an error + cgCache.logger.WithFields(bark.Fields{common.TagErr: err, common.TagExt: common.FmtExt(cge.GetExtentUUID())}).Error("ReadExtentStats failed on refresh") + } + + return +} + // getDynamicCgConfig gets the configuration object for this host func (cgCache *consumerGroupCache) getDynamicCgConfig() (OutputCgConfig, error) { dCfgIface, err := cgCache.cfgMgr.Get(common.OutputServiceName, `*`, `*`, `*`) diff --git a/services/outputhost/consconnection.go b/services/outputhost/consconnection.go index c24a83b5..85e79756 100644 --- a/services/outputhost/consconnection.go +++ b/services/outputhost/consconnection.go @@ -312,13 +312,16 @@ func (conn *consConnection) writeMsgsStream() { conn.reSentMsgs++ case msg := <-conn.msgsCh: conn.createMsgAndWriteToClientUtil(msg, &unflushedWrites, &localCredits, flushThreshold, conn.msgCacheCh) - case <-flushTicker.C: + case f := <-flushTicker.C: if unflushedWrites > 0 { if err := conn.flushToClient(unflushedWrites); err == nil { unflushedWrites = 0 } } - conn.cgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGMessageCacheSize, int64(len(conn.msgsCh))) + + if f.Second() == 0 && f.Nanosecond() < int(flushTimeout*2) { // Record every minute or so + conn.cgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGMessageCacheSize, int64(len(conn.msgsCh))) + } case updateUUID := <-conn.reconfigureClientCh: conn.logger.WithField(`updateUUID`, updateUUID).Debug(`reconfiguring client with updateUUID`) cmd := createReconfigureCmd(updateUUID) diff --git a/services/outputhost/messagecache.go b/services/outputhost/messagecache.go index 895394e1..4d7eec21 100644 --- a/services/outputhost/messagecache.go +++ b/services/outputhost/messagecache.go @@ -21,8 +21,10 @@ package outputhost import ( + "sync/atomic" "fmt" "strings" + "sync" "time" "github.com/uber-common/bark" @@ -1073,10 +1075,10 @@ func (msgCache *cgMsgCache) isStalled() bool { var m3St m3HealthState var smartRetryDisabled bool - if strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) { + if atomic.LoadInt32(&msgCache.cgCache.dlqMerging) > 0 || strings.Contains(msgCache.GetOwnerEmail(), SmartRetryDisableString) { smartRetryDisabled = true } - + now := common.Now() // Determines that no progress has been made in the recent past; this is half the lock timeout to be @@ -1164,8 +1166,8 @@ func (msgCache *cgMsgCache) isStalled() bool { msgCache.logMessageCacheHealth() } - // Assign M3 state, preferring progressing over idle. It is possible for both progressing and idle to be true if there are - // no redeliveries are happening + // Assign M3 state, preferring progressing over idle. It is possible for both + // progressing and idle to be true if there are no redeliveries happening switch { case stalled: m3St = stateStalled @@ -1175,6 +1177,8 @@ func (msgCache *cgMsgCache) isStalled() bool { m3St = stateIdle } + // These metrics are reported to customers, and they don't reflect the possibility of smart retry being disabled. + // It's a feedback mechanism to the customer to give an idea of the health of their consumer group msgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGHealthState, int64(m3St)) msgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGOutstandingDeliveries, int64(msgCache.countStateDelivered+msgCache.countStateEarlyNACK)) @@ -1185,6 +1189,7 @@ func (msgCache *cgMsgCache) isStalled() bool { stalled = false } + // These metrics are reported to the controller, and represent the true state of smart retry if stalled { msgCache.cgCache.cgMetrics.Set(load.CGMetricSmartRetryOn, 1) } else { @@ -1210,12 +1215,10 @@ func (msgCache *cgMsgCache) logStateMachineHealth() { }).Info(`state machine health`) } -var logPumpHealthOnce bool +var logPumpHealthOnce sync.Once func (msgCache *cgMsgCache) logPumpHealth() { - if !logPumpHealthOnce { - logPumpHealthOnce = true - + logPumpHealthOnce.Do(func() { msgCache.lclLg.WithFields(bark.Fields{ `MsgsRedeliveryChCap`: cap(msgCache.msgsRedeliveryCh), `PriorityMsgsRedeliveryChCap`: cap(msgCache.priorityMsgsRedeliveryCh), @@ -1228,7 +1231,7 @@ func (msgCache *cgMsgCache) logPumpHealth() { `CreditRequestChCap`: cap(msgCache.creditRequestCh), `RedeliveryTickerCap`: cap(msgCache.redeliveryTicker.C), }).Info(`cache pump health capacities`) - } + }) msgCache.lclLg.WithFields(bark.Fields{ `MsgsRedeliveryChLen`: len(msgCache.msgsRedeliveryCh), diff --git a/services/outputhost/outputhost_test.go b/services/outputhost/outputhost_test.go index 4316cf88..6baf5867 100644 --- a/services/outputhost/outputhost_test.go +++ b/services/outputhost/outputhost_test.go @@ -177,6 +177,7 @@ func (s *OutputHostSuite) TestOutputHostReadMessage() { destDesc.DestinationUUID = common.StringPtr(destUUID) destDesc.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus_ENABLED) s.mockMeta.On("ReadDestination", mock.Anything, mock.Anything).Return(destDesc, nil).Once() + s.mockMeta.On("ReadExtentStats", mock.Anything, mock.Anything).Return(nil, fmt.Errorf(`foo`)) cgDesc := shared.NewConsumerGroupDescription() cgDesc.ConsumerGroupUUID = common.StringPtr(uuid.New()) @@ -239,6 +240,7 @@ func (s *OutputHostSuite) TestOutputHostAckMessage() { destDesc.DestinationUUID = common.StringPtr(destUUID) destDesc.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus_ENABLED) s.mockMeta.On("ReadDestination", mock.Anything, mock.Anything).Return(destDesc, nil).Once() + s.mockMeta.On("ReadExtentStats", mock.Anything, mock.Anything).Return(nil, fmt.Errorf(`foo`)) cgUUID := uuid.New() extUUID := uuid.New() @@ -355,6 +357,7 @@ func (s *OutputHostSuite) TestOutputHostReconfigure() { destDesc.DestinationUUID = common.StringPtr(destUUID) destDesc.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus_ENABLED) s.mockMeta.On("ReadDestination", mock.Anything, mock.Anything).Return(destDesc, nil).Twice() + s.mockMeta.On("ReadExtentStats", mock.Anything, mock.Anything).Return(nil, fmt.Errorf(`foo`)) // 1. Make sure we setup the ReadConsumerGroup metadata cgDesc := shared.NewConsumerGroupDescription() @@ -478,6 +481,7 @@ func (s *OutputHostSuite) TestOutputHostReceiveMessageBatch() { destDesc.DestinationUUID = common.StringPtr(destUUID) destDesc.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus_ENABLED) s.mockMeta.On("ReadDestination", mock.Anything, mock.Anything).Return(destDesc, nil).Once() + s.mockMeta.On("ReadExtentStats", mock.Anything, mock.Anything).Return(nil, fmt.Errorf(`foo`)) cgDesc := shared.NewConsumerGroupDescription() cgDesc.ConsumerGroupUUID = common.StringPtr(uuid.New()) @@ -546,6 +550,7 @@ func (s *OutputHostSuite) TestOutputHostReceiveMessageBatch_NoMsg() { destDesc.DestinationUUID = common.StringPtr(destUUID) destDesc.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus_ENABLED) s.mockMeta.On("ReadDestination", mock.Anything, mock.Anything).Return(destDesc, nil) + s.mockMeta.On("ReadExtentStats", mock.Anything, mock.Anything).Return(nil, fmt.Errorf(`foo`)) cgDesc := shared.NewConsumerGroupDescription() cgDesc.ConsumerGroupUUID = common.StringPtr(uuid.New()) @@ -595,6 +600,7 @@ func (s *OutputHostSuite) TestOutputHostReceiveMessageBatch_SomeMsgAvailable() { destDesc.DestinationUUID = common.StringPtr(destUUID) destDesc.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus_ENABLED) s.mockMeta.On("ReadDestination", mock.Anything, mock.Anything).Return(destDesc, nil) + s.mockMeta.On("ReadExtentStats", mock.Anything, mock.Anything).Return(nil, fmt.Errorf(`foo`)) cgDesc := shared.NewConsumerGroupDescription() cgDesc.ConsumerGroupUUID = common.StringPtr(uuid.New()) @@ -667,6 +673,7 @@ func (s *OutputHostSuite) TestOutputCgUnload() { destDesc.DestinationUUID = common.StringPtr(destUUID) destDesc.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus_ENABLED) s.mockMeta.On("ReadDestination", mock.Anything, mock.Anything).Return(destDesc, nil).Twice() + s.mockMeta.On("ReadExtentStats", mock.Anything, mock.Anything).Return(nil, fmt.Errorf(`foo`)) // 1. Make sure we setup the ReadConsumerGroup metadata cgDesc := shared.NewConsumerGroupDescription() @@ -759,6 +766,7 @@ func (s *OutputHostSuite) TestOutputAckMgrReset() { destDesc.DestinationUUID = common.StringPtr(destUUID) destDesc.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus_ENABLED) s.mockMeta.On("ReadDestination", mock.Anything, mock.Anything).Return(destDesc, nil).Twice() + s.mockMeta.On("ReadExtentStats", mock.Anything, mock.Anything).Return(nil, fmt.Errorf(`foo`)) // 1. Make sure we setup the ReadConsumerGroup metadata cgDesc := shared.NewConsumerGroupDescription() diff --git a/test/integration/base.go b/test/integration/base.go index 0af60781..df9ebd3c 100644 --- a/test/integration/base.go +++ b/test/integration/base.go @@ -197,7 +197,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { } cfg := cfgMap[common.StoreServiceName][i].ServiceConfig[common.StoreServiceName] - reporter := common.NewMetricReporterWithHostname(configure.NewCommonServiceConfig()) + reporter := common.NewTestMetricsReporter() dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.StoreServiceName) sCommon := common.NewService(common.StoreServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient) log.Infof("store ringHosts: %v", cfg.GetRingHosts()) @@ -221,7 +221,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { for i := 0; i < clusterSz[common.InputServiceName]; i++ { hostID := uuid.New() cfg := cfgMap[common.InputServiceName][i].ServiceConfig[common.InputServiceName] - reporter := common.NewMetricReporterWithHostname(configure.NewCommonServiceConfig()) + reporter := common.NewTestMetricsReporter() dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.InputServiceName) sCommon := common.NewService(common.InputServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient) log.Infof("input ringHosts: %v", cfg.GetRingHosts()) @@ -236,7 +236,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { for i := 0; i < clusterSz[common.FrontendServiceName]; i++ { hostID := uuid.New() cfg := cfgMap[common.FrontendServiceName][i].ServiceConfig[common.FrontendServiceName] - reporter := common.NewMetricReporterWithHostname(configure.NewCommonServiceConfig()) + reporter := common.NewTestMetricsReporter() dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.FrontendServiceName) sCommon := common.NewService(common.FrontendServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient) @@ -250,7 +250,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { for i := 0; i < clusterSz[common.OutputServiceName]; i++ { hostID := uuid.New() cfg := cfgMap[common.OutputServiceName][i].ServiceConfig[common.OutputServiceName] - reporter := common.NewMetricReporterWithHostname(configure.NewCommonServiceConfig()) + reporter := common.NewTestMetricsReporter() dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.OutputServiceName) sCommon := common.NewService(common.OutputServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient) log.Infof("output ringHosts: %v", cfg.GetRingHosts()) @@ -267,7 +267,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { cfg := cfgMap[common.ControllerServiceName][i] log.Infof("ctrlr ringHosts: %v", cfg.ServiceConfig[common.ControllerServiceName].RingHosts) serviceName := common.ControllerServiceName - reporter := common.NewMetricReporterWithHostname(configure.NewCommonServiceConfig()) + reporter := common.NewTestMetricsReporter() dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.ControllerServiceName) sVice := common.NewService(serviceName, uuid.New(), cfg.ServiceConfig[serviceName], tb.UUIDResolver, hwInfoReader, reporter, dClient) ch, tc := controllerhost.NewController(cfg, sVice, tb.mClient) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 1b342599..baf0ca5c 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -26,6 +26,7 @@ import ( "net" "strconv" "sync" + "sync/atomic" "time" "github.com/uber-common/bark" @@ -34,6 +35,7 @@ import ( "github.com/stretchr/testify/suite" client "github.com/uber/cherami-client-go/client/cherami" "github.com/uber/cherami-server/common" + "github.com/uber/cherami-server/common/metrics" "github.com/uber/cherami-server/services/controllerhost" "github.com/uber/cherami-server/services/storehost" "github.com/uber/cherami-thrift/.generated/go/cherami" @@ -52,6 +54,9 @@ type NetIntegrationSuiteParallelA struct { type NetIntegrationSuiteParallelC struct { testBase } +type NetIntegrationSuiteParallelD struct { + testBase +} type NetIntegrationSuiteSerial struct { testBase } @@ -74,6 +79,12 @@ func TestNetIntegrationSuiteParallelC(t *testing.T) { t.Parallel() suite.Run(t, s) } +func TestNetIntegrationSuiteParallelD(t *testing.T) { + s := new(NetIntegrationSuiteParallelD) + s.testBase.SetupSuite(t) + t.Parallel() + suite.Run(t, s) +} // Disabled, since it is apparently impossible to get this test to run without racing with the parallel tests func XXXTestNetIntegrationSuiteSerial(t *testing.T) { @@ -1244,6 +1255,320 @@ operationsLoop: } +func (s *NetIntegrationSuiteParallelD) TestSmartRetryDisableDuringDLQMerge() { + const ( + destPath = `/test.runner.SmartRetry/SRDDDM` // This path ensures that throttling is limited for this test + cgPath = `/test.runner.SmartRetry/SRDDDMCG` + metricsName = `_test.runner.SmartRetry_SRDDDM` + cgMaxDeliveryCount = 1 + cgLockTimeout = 1 + cnsmrPrefetch = 10 + publisherPubInterval = time.Second / 5 + publisherPubSlowInterval = publisherPubInterval * 10 // This is 10x slower + DLQPublishClearTime = cgLockTimeout * time.Second * 2 + DLQMergeMessageTargetCount = 2 + DLQMessageStart = 10 + DLQMessageSpacing = 6 + mergeAssumedCompleteTime = cgLockTimeout * (cgMaxDeliveryCount + 1) * 2 * time.Second * 2 // +1 for initial delivery, *2 for dlqInhibit, *2 for fudge + testTimeout = time.Second * 180 + ) + + const ( + // Operation/Phase IDS + produceDLQ = iota + smartRetryProvoke1 + mergeOp + smartRetryProvoke3 + smartRetryProvoke4 + done + PhaseCount + ) + + const ( + stateStalled = iota + stateIdle + stateProgressing + ) + + var dlqMutex sync.RWMutex + phase := produceDLQ + var currentHealth = stateIdle + var dlqDeliveryCount int + var dlqDeliveryTime time.Time + phaseOnce := make([]sync.Once, PhaseCount) + + // ll - local log + ll := func(fmtS string, rest ...interface{}) { + common.GetDefaultLogger().WithFields(bark.Fields{`phase`: phase}).Infof(fmtS, rest...) + //fmt.Printf(`p`+strconv.Itoa(phase)+` `+fmtS+"\n", rest...) + } + + // lll - local log with lock (for race on access to phase) + lll := func(fmtS string, rest ...interface{}) { + dlqMutex.Lock() + ll(fmtS, rest...) + dlqMutex.Unlock() + } + + // == Metrics === + + getCurrentHealth := func() int { + dlqMutex.RLock() + r := currentHealth + dlqMutex.RUnlock() + return r + } + + getDLQDeliveryCount := func() int { + dlqMutex.RLock() + r := dlqDeliveryCount + dlqMutex.RUnlock() + return r + } + + getDLQDeliveryTime := func() time.Time { + dlqMutex.RLock() + r := dlqDeliveryTime + dlqMutex.RUnlock() + return r + } + + defer metrics.RegisterHandler(`outputhost.healthstate.cg`, `destination`, metricsName, nil) + metrics.RegisterHandler(`outputhost.healthstate.cg`, `destination`, metricsName, + func(metricName string, baseTags, tags map[string]string, value int64) { + dlqMutex.Lock() + last := currentHealth + currentHealth = int(value) + if last != currentHealth { + ll("Metric %s: %d", metricName, value) + } + dlqMutex.Unlock() + }) + + defer metrics.RegisterHandler(`outputhost.message.sent-dlq.cg`, `destination`, metricsName, nil) + metrics.RegisterHandler(`outputhost.message.sent-dlq.cg`, `destination`, metricsName, + func(metricName string, baseTags, tags map[string]string, value int64) { + dlqMutex.Lock() + dlqDeliveryCount += int(value) + ll("Metric %s: %d (+%d)", metricName, dlqDeliveryCount, value) + if value > 0 { + dlqDeliveryTime = time.Now() + } + dlqMutex.Unlock() + }) + + // == Merge == + + merge := func() { + fe := s.GetFrontend() + s.NotNil(fe) + mergeReq := cherami.NewMergeDLQForConsumerGroupRequest() + mergeReq.DestinationPath = common.StringPtr(destPath) + mergeReq.ConsumerGroupName = common.StringPtr(cgPath) + + time.Sleep(DLQPublishClearTime) + // Merge DLQ + err := fe.MergeDLQForConsumerGroup(nil, mergeReq) + s.NoError(err) + } + + // == Setup == + + // Create the client + ipaddr, port, _ := net.SplitHostPort(s.GetFrontend().GetTChannel().PeerInfo().HostPort) + portNum, _ := strconv.Atoi(port) + cheramiClient, _ := client.NewClient("cherami-test", ipaddr, portNum, nil) + + // Create the destination to publish message + crReq := cherami.NewCreateDestinationRequest() + crReq.Path = common.StringPtr(destPath) + crReq.Type = cherami.DestinationTypePtr(cherami.DestinationType_PLAIN) + crReq.ConsumedMessagesRetention = common.Int32Ptr(60) + crReq.UnconsumedMessagesRetention = common.Int32Ptr(120) + crReq.OwnerEmail = common.StringPtr("lhcIntegration@uber.com") + + desDesc, _ := cheramiClient.CreateDestination(crReq) + s.NotNil(desDesc) + + // we should see the destination now in cassandra + rReq := cherami.NewReadDestinationRequest() + rReq.Path = common.StringPtr(destPath) + readDesc, _ := cheramiClient.ReadDestination(rReq) + s.NotNil(readDesc) + + // ==WRITE== + + // Create the publisher + cPublisherReq := &client.CreatePublisherRequest{ + Path: destPath, + } + + publisherTest := cheramiClient.CreatePublisher(cPublisherReq) + s.NotNil(publisherTest) + + err := publisherTest.Open() + s.NoError(err) + + // Publish messages continuously in a goroutine to ensures a steady supply of 'good' messages so + // that smart retry will not affect us. + var curPubInterval = int64(publisherPubInterval) + closeCh := make(chan struct{}) + defer close(closeCh) + go func() { + i := 0 + defer lll("DONE PUBLISHING %v MESSAGES", i) + defer publisherTest.Close() + myIntrvl := atomic.LoadInt64(&curPubInterval) + ticker := time.NewTicker(time.Duration(myIntrvl)) + for { + select { + case <-ticker.C: + data := []byte(fmt.Sprintf("msg_%d", i+1)) + receipt := publisherTest.Publish(&client.PublisherMessage{Data: data}) + s.NoError(receipt.Error) + i++ + + if atomic.LoadInt64(&curPubInterval) != myIntrvl { // Adjust publish speed as requested + ticker.Stop() + myIntrvl = atomic.LoadInt64(&curPubInterval) + ticker = time.NewTicker(time.Duration(myIntrvl)) + lll("publisher changed interval to %v seconds", common.UnixNanoTime(myIntrvl).ToSeconds()) + } + case <-closeCh: + return + } + } + }() + + // ==READ== + + // Create the consumer group + cgReq := cherami.NewCreateConsumerGroupRequest() + cgReq.ConsumerGroupName = common.StringPtr(cgPath) + cgReq.DestinationPath = common.StringPtr(destPath) + cgReq.LockTimeoutInSeconds = common.Int32Ptr(cgLockTimeout) // this is the message redelivery timeout + cgReq.MaxDeliveryCount = common.Int32Ptr(cgMaxDeliveryCount) + cgReq.OwnerEmail = common.StringPtr("consumer_integration_test@uber.com") + + cgDesc, err := cheramiClient.CreateConsumerGroup(cgReq) + s.NoError(err) + s.NotNil(cgDesc) + + cConsumerReq := &client.CreateConsumerRequest{ + Path: destPath, + ConsumerGroupName: cgPath, + ConsumerName: "consumerName", + PrefetchCount: cnsmrPrefetch, + Options: &client.ClientOptions{Timeout: time.Second * 30}, // this is the thrift context timeout + } + + consumerTest := cheramiClient.CreateConsumer(cConsumerReq) + s.NotNil(consumerTest) + + // Open the consumer channel + delivery := make(chan client.Delivery, 1) + delivery, err = consumerTest.Open(delivery) + s.NoError(err) + + beforeMergeDLQDeliveryCount := -1 + testStartTime := time.Now() + + // Read the messages in a loop. +readLoop: + for msgCount := 0; ; { + select { + case msg := <-delivery: + msgCount++ + msgID, _ := strconv.Atoi(string(msg.GetMessage().GetPayload().GetData()[4:])) + var ack, poison, merged bool + + msgDecorator := ` ` + if msgID > DLQMessageStart && msgID%DLQMessageSpacing == 0 { + msgDecorator = `* ` // DLQ Message + poison = true + if msgID < DLQMessageStart+beforeMergeDLQDeliveryCount*DLQMessageSpacing { + merged = true + msgDecorator = `*M` // DLQ Message, and was likely merged + } + } + + lll("msgId %3d %s dlqDvlry=%3d (merged %2d, last %-12s), health = %d", + msgID, + msgDecorator, + getDLQDeliveryCount(), + beforeMergeDLQDeliveryCount, + common.UnixNanoTime(time.Since(getDLQDeliveryTime())).ToSecondsFmt(), + getCurrentHealth()) + + if time.Since(testStartTime) > testTimeout { + s.Fail("This test should complete quickly") + break + } + + switch phase { + case produceDLQ: // Normal consumption with some selected 'poison' message. This is dilute poison going to DLQ + if !poison { + ack = true + } + s.NotEqual(getCurrentHealth(), stateStalled) + if getDLQDeliveryCount() >= DLQMergeMessageTargetCount { // Produced enough DLQ, move on + phase++ + } + case smartRetryProvoke1: // Provoke smart retry by NACKing everything + if getCurrentHealth() == stateStalled { + phase++ + } + case mergeOp: // Now in smart retry, perform the merge, wait for it to come into effect; transition to healthy state by acking + beforeMergeDLQDeliveryCount = getDLQDeliveryCount() + phaseOnce[phase].Do(func() { go merge() }) // Perform the merge once, asychronously + + if merged { + s.True(poison) // This is just an assertion/sanity check + if getCurrentHealth() == stateProgressing { + phase++ + } + } else { + // ACK all messages until we see something get merged; this halts DLQ production + ack = true + } + case smartRetryProvoke3: + phaseOnce[phase].Do(func() { lll("smartRetryProvoke3") }) + + // We will transition from healthy to stalled by NACKing in this phase + s.False(getCurrentHealth() == stateIdle) + if getCurrentHealth() == stateStalled { + phase++ + } + case smartRetryProvoke4: + phaseOnce[phase].Do(func() { + lll("smartRetryProvoke4") + atomic.StoreInt64(&curPubInterval, int64(publisherPubSlowInterval)) // Slow down the publisher so that we can finish faster + }) + + // Since we are merging, the indicated state is stalled, but smart retry is disabled until the merge completes + s.Equal(getCurrentHealth(), stateStalled, `While NACKING, indicated state should be stalled`) + + if getDLQDeliveryCount() > beforeMergeDLQDeliveryCount*2 && // Verify that we've published enough to DLQ since the merge + time.Since(getDLQDeliveryTime()) > mergeAssumedCompleteTime { // Verify that we are done DLQ publishing (i.e. transitioned to normal smart retry behavior) + phase++ + } + case done: + phaseOnce[phase].Do(func() { lll("done") }) + ack = true + if getCurrentHealth() == stateProgressing { // Verify that we can get back to fully normal health + break readLoop + } + } + + if ack { + msg.Ack() + } else { + msg.Nack() + } + } + } +} + func (s *NetIntegrationSuiteParallelA) _TestSmartRetry() { destPath := "/test.runner.SmartRetry/TestSmartRetry" cgPath := "/test.runner.SmartRetry/TestSmartRetryCG" diff --git a/tools/admin/lib.go b/tools/admin/lib.go index 34344fcb..4823637b 100644 --- a/tools/admin/lib.go +++ b/tools/admin/lib.go @@ -52,7 +52,8 @@ func CreateDestination(c *cli.Context, cliHelper common.CliHelper) { // UpdateDestination updates properties of a destination func UpdateDestination(c *cli.Context) { cClient := toolscommon.GetCClient(c, adminToolService) - toolscommon.UpdateDestination(c, cClient) + mClient := toolscommon.GetMClient(c, adminToolService) + toolscommon.UpdateDestination(c, cClient, mClient) } // CreateConsumerGroup creates a consumer group diff --git a/tools/cli/lib.go b/tools/cli/lib.go index 340093e0..a7c1be3e 100644 --- a/tools/cli/lib.go +++ b/tools/cli/lib.go @@ -44,8 +44,9 @@ func CreateDestination(c *cli.Context, cliHelper scommon.CliHelper) { // UpdateDestination updates the destination func UpdateDestination(c *cli.Context) { + mClient := common.GetMClient(c, serviceName) cClient := common.GetCClient(c, serviceName) - common.UpdateDestination(c, cClient) + common.UpdateDestination(c, cClient, mClient) } // CreateConsumerGroup creates the CG diff --git a/tools/common/lib.go b/tools/common/lib.go index ba88f869..bb831433 100644 --- a/tools/common/lib.go +++ b/tools/common/lib.go @@ -69,17 +69,20 @@ const ( DestinationType = "DST" // ConsumerGroupType is the name for entity type for consumer group in listEntityOps ConsumerGroupType = "CG" + // MinUnconsumedMessagesRetentionForMultiZoneDest is the minimum unconsumed retention allowed + MinUnconsumedMessagesRetentionForMultiZoneDest = 3 * 24 * 3600 ) const ( - strNotEnoughArgs = "Not enough arguments. Try \"--help\"" - strNoChange = "Update must update something. Try \"--help\"" - strCGSpecIncorrectArgs = "Incorrect consumer group specification. Use \"\" or \" \"" - strDestStatus = "Destination status must be \"enabled\", \"disabled\", \"sendonly\", or \"recvonly\"" - strCGStatus = "Consumer group status must be \"enabled\", or \"disabled\"" - strWrongDestZoneConfig = "Format of destination zone config is wrong, should be \"ZoneName,AllowPublish,AllowConsume,ReplicaCount\". For example: \"zone1,true,true,3\"" - strWrongReplicaCount = "Replica count must be within 1 to 3" - strWrongZoneConfigCount = "Multi zone destination must have at least 2 zone configs" + strNotEnoughArgs = "Not enough arguments. Try \"--help\"" + strNoChange = "Update must update something. Try \"--help\"" + strCGSpecIncorrectArgs = "Incorrect consumer group specification. Use \"\" or \" \"" + strDestStatus = "Destination status must be \"enabled\", \"disabled\", \"sendonly\", or \"recvonly\"" + strCGStatus = "Consumer group status must be \"enabled\", or \"disabled\"" + strWrongDestZoneConfig = "Format of destination zone config is wrong, should be \"ZoneName,AllowPublish,AllowConsume,ReplicaCount\". For example: \"zone1,true,true,3\"" + strWrongReplicaCount = "Replica count must be within 1 to 3" + strWrongZoneConfigCount = "Multi zone destination must have at least 2 zone configs" + strUnconsumedRetentionTooSmall = "Unconsumed retention period for multi zone destination should be at least 3 days" ) var uuidRegex, _ = regexp.Compile(`^[[:xdigit:]]{8}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{12}$`) @@ -228,6 +231,13 @@ func CreateDestination(c *cli.Context, cClient ccli.Client, cliHelper common.Cli ExitIfError(errors.New(strWrongZoneConfigCount)) } + // don't allow short unconsumed message retention for multi_zone destination + // this is a prevention mechanism to prevent messages from being deleted in source zone in case there's some + // issue with cross zone replication(for example, network down between zones) + if len(zoneConfigs.Configs) > 1 && unconsumedMessagesRetention < MinUnconsumedMessagesRetentionForMultiZoneDest { + ExitIfError(errors.New(strUnconsumedRetentionTooSmall)) + } + desc, err := cClient.CreateDestination(&cherami.CreateDestinationRequest{ Path: &path, Type: &dType, @@ -244,7 +254,7 @@ func CreateDestination(c *cli.Context, cClient ccli.Client, cliHelper common.Cli } // UpdateDestination update destination based on cli -func UpdateDestination(c *cli.Context, cClient ccli.Client) { +func UpdateDestination(c *cli.Context, cClient ccli.Client, mClient mcli.Client) { if len(c.Args()) < 1 { ExitIfError(errors.New(strNotEnoughArgs)) } @@ -266,6 +276,19 @@ func UpdateDestination(c *cli.Context, cClient ccli.Client) { } } + // don't allow short unconsumed message retention for multi_zone destination + // this is a prevention mechanism to prevent messages from being deleted in source zone in case there's some + // issue with cross zone replication(for example, network down between zones) + if c.IsSet(`unconsumed_messages_retention`) && int32(c.Int(`unconsumed_messages_retention`)) < MinUnconsumedMessagesRetentionForMultiZoneDest { + desc, err := mClient.ReadDestination(&metadata.ReadDestinationRequest{ + Path: &path, + }) + ExitIfError(err) + if desc.GetIsMultiZone() { + ExitIfError(errors.New(strUnconsumedRetentionTooSmall)) + } + } + setCount := 0 request := &cherami.UpdateDestinationRequest{ @@ -403,17 +426,19 @@ func UnloadConsumerGroup(c *cli.Context, mClient mcli.Client) { } type destDescJSONOutputFields struct { - Path string `json:"path"` - UUID string `json:"uuid"` - Status shared.DestinationStatus `json:"status"` - Type shared.DestinationType `json:"type"` - ChecksumOption string `json:"checksum_option"` - OwnerEmail string `json:"owner_email"` - ConsumedMessagesRetention int32 `json:"consumed_messages_retention"` - UnconsumedMessagesRetention int32 `json:"unconsumed_messages_retention"` - DLQCGUUID string `json:"dlq_cg_uuid"` - DLQPurgeBefore int64 `json:"dlq_purge_before"` - DLQMergeBefore int64 `json:"dlq_merge_before"` + Path string `json:"path"` + UUID string `json:"uuid"` + Status shared.DestinationStatus `json:"status"` + Type shared.DestinationType `json:"type"` + ChecksumOption string `json:"checksum_option"` + OwnerEmail string `json:"owner_email"` + ConsumedMessagesRetention int32 `json:"consumed_messages_retention"` + UnconsumedMessagesRetention int32 `json:"unconsumed_messages_retention"` + DLQCGUUID string `json:"dlq_cg_uuid"` + DLQPurgeBefore int64 `json:"dlq_purge_before"` + DLQMergeBefore int64 `json:"dlq_merge_before"` + IsMultiZone bool `json:"is_multi_zone"` + ZoneConfigs []*shared.DestinationZoneConfig `json:"zone_configs"` } func printDest(dest *shared.DestinationDescription) { @@ -428,6 +453,8 @@ func printDest(dest *shared.DestinationDescription) { DLQCGUUID: dest.GetDLQConsumerGroupUUID(), DLQPurgeBefore: dest.GetDLQPurgeBefore(), DLQMergeBefore: dest.GetDLQMergeBefore(), + IsMultiZone: dest.GetIsMultiZone(), + ZoneConfigs: dest.GetZoneConfigs(), } switch dest.GetChecksumOption() { @@ -670,18 +697,20 @@ func PurgeDLQForConsumerGroup(c *cli.Context, cClient ccli.Client) { } type destJSONOutputFields struct { - DestinationName string `json:"destination_name"` - DestinationUUID string `json:"destination_uuid"` - Status shared.DestinationStatus `json:"status"` - Type shared.DestinationType `json:"type"` - OwnerEmail string `json:"owner_email"` - TotalExts int `json:"total_ext"` - OpenExts int `json:"open"` - SealedExts int `json:"sealed"` - ConsumedExts int `json:"consumed"` - DeletedExts int `json:"Deleted"` - ConsumedMessagesRetention int32 `json:"consumed_messages_retention"` - UnconsumedMessagesRetention int32 `json:"unconsumed_messages_retention"` + DestinationName string `json:"destination_name"` + DestinationUUID string `json:"destination_uuid"` + Status shared.DestinationStatus `json:"status"` + Type shared.DestinationType `json:"type"` + OwnerEmail string `json:"owner_email"` + TotalExts int `json:"total_ext"` + OpenExts int `json:"open"` + SealedExts int `json:"sealed"` + ConsumedExts int `json:"consumed"` + DeletedExts int `json:"Deleted"` + ConsumedMessagesRetention int32 `json:"consumed_messages_retention"` + UnconsumedMessagesRetention int32 `json:"unconsumed_messages_retention"` + IsMultiZone bool `json:"is_multi_zone"` + ZoneConfigs []*shared.DestinationZoneConfig `json:"zone_configs"` } func matchDestStatus(status string, wantStatus shared.DestinationStatus) bool { @@ -851,6 +880,8 @@ func ListDestinations(c *cli.Context, mClient mcli.Client) { DeletedExts: nDeleted, ConsumedMessagesRetention: desc.GetConsumedMessagesRetention(), UnconsumedMessagesRetention: desc.GetUnconsumedMessagesRetention(), + IsMultiZone: desc.GetIsMultiZone(), + ZoneConfigs: desc.GetZoneConfigs(), } destsInfo[status] = append(destsInfo[status], outputDest) }