Skip to content

Commit

Permalink
Retry namespace-not-found errors in replication
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Oct 19, 2023
1 parent b8f2123 commit e67d575
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 7 deletions.
4 changes: 0 additions & 4 deletions service/history/replication/task_executor.go
Expand Up @@ -368,10 +368,6 @@ func (e *taskExecutorImpl) filterTask(

namespaceEntry, err := e.namespaceRegistry.GetNamespaceByID(namespaceID)
if err != nil {
if _, ok := err.(*serviceerror.NamespaceNotFound); ok {
// Drop the task
return false, nil
}
return false, err
}

Expand Down
5 changes: 3 additions & 2 deletions service/history/replication/task_executor_test.go
Expand Up @@ -181,11 +181,12 @@ func (s *taskExecutorSuite) TestFilterTask_EnforceApply() {

func (s *taskExecutorSuite) TestFilterTask_NamespaceNotFound() {
namespaceID := namespace.ID(uuid.New())
notFoundErr := &serviceerror.NamespaceNotFound{}
s.mockNamespaceCache.EXPECT().
GetNamespaceByID(namespaceID).
Return(nil, &serviceerror.NamespaceNotFound{})
Return(nil, notFoundErr)
ok, err := s.replicationTaskExecutor.filterTask(namespaceID, false)
s.NoError(err)
s.ErrorIs(err, notFoundErr)
s.False(ok)
}

Expand Down
1 change: 0 additions & 1 deletion tests/dynamicconfig.go
Expand Up @@ -42,7 +42,6 @@ var (
dynamicconfig.FrontendRPS: 3000,
dynamicconfig.FrontendMaxNamespaceVisibilityRPSPerInstance: 50,
dynamicconfig.FrontendMaxNamespaceVisibilityBurstPerInstance: 50,
dynamicconfig.ReplicationTaskProcessorErrorRetryMaxAttempts: 1,
dynamicconfig.AdvancedVisibilityWritingMode: visibility.SecondaryVisibilityWritingModeOff,
dynamicconfig.WorkflowTaskHeartbeatTimeout: 5 * time.Second,
dynamicconfig.ReplicationTaskFetcherAggregationInterval: 200 * time.Millisecond,
Expand Down
130 changes: 130 additions & 0 deletions tests/xdc/workflow_replication_test.go
@@ -0,0 +1,130 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:build !race

package xdc

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/suite"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
sdkworker "go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/primitives/timestamp"
)

type workflowReplicationSuite struct {
xdcBaseSuite
}

func TestWorkflowReplicationSuite(t *testing.T) {
suite.Run(t, new(workflowReplicationSuite))
}

func (s *workflowReplicationSuite) SetupSuite() {
const format = "workflow_replication_test_%s_cluster"
s.setupSuite([]string{
fmt.Sprintf(format, "active"),
fmt.Sprintf(format, "standby"),
})
}

func (s *workflowReplicationSuite) TearDownSuite() {
s.tearDownSuite()
}

func (s *workflowReplicationSuite) SetupTest() {
s.setupTest()
}

func (s *workflowReplicationSuite) TestReplicationFailure() {
// Register a namespace
ctx := context.Background()
ns := s.T().Name() + "-" + common.GenerateRandomString(8)
_, err := s.cluster1.GetFrontendClient().RegisterNamespace(ctx, &workflowservice.RegisterNamespaceRequest{
Namespace: ns,
Clusters: s.clusterReplicationConfig(),
ActiveClusterName: s.clusterNames[0],
IsGlobalNamespace: true,
WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24), // required param
})
s.NoError(err)

// Create a worker and register a workflow
activeClient, err := sdkclient.Dial(sdkclient.Options{
HostPort: s.cluster1.GetHost().FrontendGRPCAddress(),
Namespace: ns,
})
s.NoError(err)
tq := "test-task-queue-" + s.T().Name()
worker := sdkworker.New(activeClient, tq, sdkworker.Options{})
myWorkflow := func(ctx workflow.Context) (string, error) {
return "hello", nil
}
worker.RegisterWorkflow(myWorkflow)
s.NoError(worker.Start())
defer worker.Stop()

// Execute the workflow
run, err := activeClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
TaskQueue: tq,
}, myWorkflow)
s.NoError(err)

// Wait for the workflow to complete
var result string
err = run.Get(ctx, &result)
s.NoError(err)
s.Equal("hello", result)

// Connect to the standby cluster
standbyClient, err := sdkclient.Dial(sdkclient.Options{
HostPort: s.cluster2.GetHost().FrontendGRPCAddress(),
Namespace: ns,
})
s.NoError(err)

// Verify that the workflow eventually replicates to the standby cluster
err = backoff.ThrottleRetryContext(
ctx,
func(ctx context.Context) error {
run = standbyClient.GetWorkflow(ctx, run.GetID(), run.GetRunID())
return run.Get(ctx, &result)
},
backoff.NewExponentialRetryPolicy(time.Millisecond).
WithBackoffCoefficient(1.0).
WithExpirationInterval(10*time.Second),
backoff.IgnoreErrors(nil),
)
s.NoError(err)
s.Equal("hello", result)
}

0 comments on commit e67d575

Please sign in to comment.