Skip to content

Commit

Permalink
Get durable archival ready for integration testing
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 20, 2022
1 parent edd3f0a commit 3de6151
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 1 deletion.
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -522,6 +522,8 @@ const (
ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay"
// ArchivalProcessorRetryWarningLimit is the number of times an archival task may be retried before we log a warning
ArchivalProcessorRetryWarningLimit = "history.archivalProcessorRetryLimitWarning"
// ArchivalBackendMaxRPS is the maximum rate of requests per second to the archival backend
ArchivalBackendMaxRPS = "history.archivalBackendMaxRPS"

// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
Expand Down
35 changes: 34 additions & 1 deletion service/history/archival/archiver_test.go
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
"go.uber.org/multierr"

carchiver "go.temporal.io/server/common/archiver"
Expand All @@ -43,6 +44,7 @@ import (
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/testing/mocksdk"
"go.temporal.io/server/service/history/configs"
)

func TestArchiver(t *testing.T) {
Expand Down Expand Up @@ -280,7 +282,38 @@ func TestArchiver(t *testing.T) {
rateLimiter := quotas.NewMockRateLimiter(controller)
rateLimiter.EXPECT().WaitN(gomock.Any(), 2).Return(c.RateLimiterWaitErr)

archiver := NewArchiver(archiverProvider, logRecorder, metricsHandler, rateLimiter)
// we need this channel to get the Archiver which is created asynchronously
archivers := make(chan Archiver, 1)
// we make an app here so that we can test that the Module is working as intended
app := fx.New(
fx.Supply(fx.Annotate(archiverProvider, fx.As(new(provider.ArchiverProvider)))),
fx.Supply(fx.Annotate(logRecorder, fx.As(new(log.Logger)))),
fx.Supply(fx.Annotate(metricsHandler, fx.As(new(metrics.Handler)))),
fx.Supply(&configs.Config{
ArchivalBackendMaxRPS: func() float64 {
return 42.0
},
}),
Module,
fx.Decorate(func(rl quotas.RateLimiter) quotas.RateLimiter {
// we need to decorate the rate limiter so that we can use the mock
// we also verify that the rate being used is equal to the one in the config
assert.Equal(t, 42.0, rl.Rate())
return rateLimiter
}),
fx.Invoke(func(a Archiver) {
// after all parameters are provided, we get the Archiver and put it in the channel
// so that we can use it in the test
archivers <- a
}),
)
require.NoError(t, app.Err())
// we need to start the app for fx.Invoke to be called, so that we can get the Archiver
require.NoError(t, app.Start(ctx))
defer func() {
require.NoError(t, app.Stop(ctx))
}()
archiver := <-archivers
_, err = archiver.Archive(ctx, &Request{
HistoryURI: historyURI,
VisibilityURI: visibilityURI,
Expand Down
44 changes: 44 additions & 0 deletions service/history/archival/fx.go
@@ -0,0 +1,44 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package archival

import (
"math"

"go.uber.org/fx"

"go.temporal.io/server/common/quotas"
"go.temporal.io/server/service/history/configs"
)

var Module = fx.Options(
fx.Provide(NewArchiver),
fx.Provide(func(config *configs.Config) quotas.RateLimiter {
return quotas.NewRateLimiter(
config.ArchivalBackendMaxRPS(),
int(math.Ceil(config.ArchivalBackendMaxRPS())),
)
}),
)
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Expand Up @@ -295,6 +295,7 @@ type Config struct {
ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn
ArchivalProcessorRetryWarningLimit dynamicconfig.IntPropertyFn
ArchivalBackendMaxRPS dynamicconfig.FloatPropertyFn
}

const (
Expand Down Expand Up @@ -525,6 +526,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute),
ArchivalProcessorRetryWarningLimit: dc.GetIntProperty(dynamicconfig.ArchivalProcessorRetryWarningLimit, 100),
ArchivalBackendMaxRPS: dc.GetFloat64Property(dynamicconfig.ArchivalBackendMaxRPS, 10000.0),
}

return cfg
Expand Down
2 changes: 2 additions & 0 deletions service/history/fx.go
Expand Up @@ -54,6 +54,7 @@ import (
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/events"
Expand All @@ -68,6 +69,7 @@ var Module = fx.Options(
workflow.Module,
shard.Module,
cache.Module,
archival.Module,
fx.Provide(dynamicconfig.NewCollection),
fx.Provide(ConfigProvider), // might be worth just using provider for configs.Config directly
fx.Provide(RetryableInterceptorProvider),
Expand Down
1 change: 1 addition & 0 deletions service/history/workflow/fx.go
Expand Up @@ -30,4 +30,5 @@ import (

var Module = fx.Options(
fx.Populate(&taskGeneratorProvider),
fx.Provide(NewRelocatableAttributesFetcher),
)
1 change: 1 addition & 0 deletions service/history/workflow/task_generator.go
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"

"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/versionhistory"
Expand Down

0 comments on commit 3de6151

Please sign in to comment.