Skip to content

Commit

Permalink
Add NamespaceInvalidState and NamespaceNotFound errors (#2785)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Apr 30, 2022
1 parent 92ff8c8 commit 6035304
Show file tree
Hide file tree
Showing 57 changed files with 190 additions and 210 deletions.
1 change: 1 addition & 0 deletions client/history/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ func (c *metricClient) finishMetricsRecording(
case *serviceerror.Canceled,
*serviceerror.DeadlineExceeded,
*serviceerror.NotFound,
*serviceerror.NamespaceNotFound,
*serviceerror.WorkflowExecutionAlreadyStarted:
// noop - not interest and too many logs
default:
Expand Down
1 change: 1 addition & 0 deletions client/matching/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func (c *metricClient) finishMetricsRecording(
case *serviceerror.Canceled,
*serviceerror.DeadlineExceeded,
*serviceerror.NotFound,
*serviceerror.NamespaceNotFound,
*serviceerror.WorkflowExecutionAlreadyStarted:
// noop - not interest and too many logs
default:
Expand Down
2 changes: 1 addition & 1 deletion common/archiver/filestore/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (h *historyArchiver) Archive(
for historyIterator.HasNext() {
historyBlob, err := getNextHistoryBlob(ctx, historyIterator)
if err != nil {
if common.IsNotFoundError(err) {
if _, isNotFound := err.(*serviceerror.NotFound); isNotFound {
// workflow history no longer exists, may due to duplicated archival signal
// this may happen even in the middle of iterating history as two archival signals
// can be processed concurrently.
Expand Down
3 changes: 2 additions & 1 deletion common/archiver/filestore/historyArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

enumspb "go.temporal.io/api/enums/v1"

"go.temporal.io/server/tests/testhelper"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -294,7 +295,7 @@ func (s *historyArchiverSuite) TestArchive_Skip() {
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(historyBlob, nil),
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(nil, &serviceerror.NotFound{Message: "workflow not found"}),
historyIterator.EXPECT().Next().Return(nil, serviceerror.NewNotFound("workflow not found")),
)

historyArchiver := s.newTestHistoryArchiver(historyIterator)
Expand Down
2 changes: 1 addition & 1 deletion common/archiver/gcloud/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (h *historyArchiver) Archive(ctx context.Context, URI archiver.URI, request
part := progress.CurrentPageNumber
historyBlob, err := getNextHistoryBlob(ctx, historyIterator)
if err != nil {
if common.IsNotFoundError(err) {
if _, isNotFound := err.(*serviceerror.NotFound); isNotFound {
// workflow history no longer exists, may due to duplicated archival signal
// this may happen even in the middle of iterating history as two archival signals
// can be processed concurrently.
Expand Down
2 changes: 1 addition & 1 deletion common/archiver/gcloud/historyArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (h *historyArchiverSuite) TestArchive_Skip() {
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(historyBlob, nil),
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(nil, &serviceerror.NotFound{Message: "workflow not found"}),
historyIterator.EXPECT().Next().Return(nil, serviceerror.NewNotFound("workflow not found")),
)

historyArchiver := newHistoryArchiver(h.container, historyIterator, storageWrapper)
Expand Down
4 changes: 2 additions & 2 deletions common/archiver/historyIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (i *historyIterator) readHistoryBatches(firstEventID int64) ([]*historypb.H
newIterState := historyIteratorState{}
for size < targetSize {
currHistoryBatches, err := i.readHistory(firstEventID)
if _, ok := err.(*serviceerror.NotFound); ok && firstEventID != common.FirstEventID {
if _, isNotFound := err.(*serviceerror.NotFound); isNotFound && firstEventID != common.FirstEventID {
newIterState.FinishedIteration = true
return historyBatches, newIterState, nil
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func (i *historyIterator) readHistoryBatches(firstEventID int64) ([]*historypb.H
// If you are here, it means the target size is met after adding the last batch of read history.
// We need to check if there's more history batches.
_, err := i.readHistory(firstEventID)
if _, ok := err.(*serviceerror.NotFound); ok && firstEventID != common.FirstEventID {
if _, isNotFound := err.(*serviceerror.NotFound); isNotFound && firstEventID != common.FirstEventID {
newIterState.FinishedIteration = true
return historyBatches, newIterState, nil
}
Expand Down
2 changes: 1 addition & 1 deletion common/archiver/s3store/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (h *historyArchiver) Archive(
for historyIterator.HasNext() {
historyBlob, err := getNextHistoryBlob(ctx, historyIterator)
if err != nil {
if common.IsNotFoundError(err) {
if _, isNotFound := err.(*serviceerror.NotFound); isNotFound {
// workflow history no longer exists, may due to duplicated archival signal
// this may happen even in the middle of iterating history as two archival signals
// can be processed concurrently.
Expand Down
2 changes: 1 addition & 1 deletion common/archiver/s3store/historyArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (s *historyArchiverSuite) TestArchive_Skip() {
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(historyBlob, nil),
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(nil, &serviceerror.NotFound{Message: "workflow not found"}),
historyIterator.EXPECT().Next().Return(nil, serviceerror.NewNotFound("workflow not found")),
)

historyArchiver := s.newTestHistoryArchiver(historyIterator)
Expand Down
2 changes: 1 addition & 1 deletion common/namespace/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (d *HandlerImpl) RegisterNamespace(
case nil:
// namespace already exists, cannot proceed
return nil, serviceerror.NewNamespaceAlreadyExists("Namespace already exists.")
case *serviceerror.NotFound:
case *serviceerror.NamespaceNotFound:
// namespace does not exists, proceeds
default:
// other err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (s *namespaceHandlerGlobalNamespaceEnabledNotMasterClusterSuite) TestRegist
Namespace: namespace,
})
s.Error(err)
s.IsType(&serviceerror.NotFound{}, err)
s.IsType(&serviceerror.NamespaceNotFound{}, err)
s.Nil(resp)
}

Expand Down Expand Up @@ -469,7 +469,7 @@ func (s *namespaceHandlerGlobalNamespaceEnabledNotMasterClusterSuite) TestRegist
Namespace: namespace,
})
s.Error(err)
s.IsType(&serviceerror.NotFound{}, err)
s.IsType(&serviceerror.NamespaceNotFound{}, err)
s.Nil(resp)
}

Expand Down
10 changes: 4 additions & 6 deletions common/namespace/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ package namespace

import (
"context"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/clock"
Expand Down Expand Up @@ -95,7 +95,7 @@ type (

// GetNamespace reads the state for a single namespace by name or ID
// from persistent storage, returning an instance of
// serviceerror.NotFound if there is no matching Namespace.
// serviceerror.NamespaceNotFound if there is no matching Namespace.
GetNamespace(
context.Context,
*persistence.GetNamespaceRequest,
Expand Down Expand Up @@ -484,8 +484,7 @@ func (r *registry) getNamespace(name Name) (*Namespace, error) {
if id, ok := r.cacheNameToID.Get(name).(ID); ok {
return r.getNamespaceByIDLocked(id)
}
return nil, serviceerror.NewNotFound(
fmt.Sprintf("Namespace name %q not found", name))
return nil, serviceerror.NewNamespaceNotFound(name.String())
}

// getNamespaceByID retrieves the information from the cache if it exists.
Expand All @@ -499,8 +498,7 @@ func (r *registry) getNamespaceByIDLocked(id ID) (*Namespace, error) {
if ns, ok := r.cacheByID.Get(id).(*Namespace); ok {
return ns, nil
}
return nil, serviceerror.NewNotFound(
fmt.Sprintf("Namespace id %q not found", id))
return nil, serviceerror.NewNamespaceNotFound(id.String())
}

func (r *registry) publishCacheUpdate(
Expand Down
5 changes: 3 additions & 2 deletions common/namespace/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
namespacepb "go.temporal.io/api/namespace/v1"
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -520,7 +521,7 @@ func (s *registrySuite) TestGetTriggerListAndUpdateCache_ConcurrentAccess() {
case nil:
s.Equal(entryOld, entryNew)
waitGroup.Done()
case *serviceerror.NotFound:
case *serviceerror.NamespaceNotFound:
time.Sleep(4 * time.Second)
entryNew, err := s.registry.GetNamespaceByID(id)
s.NoError(err)
Expand Down Expand Up @@ -637,7 +638,7 @@ func (s *registrySuite) TestRemoveDeletedNamespace() {
ns1FromRegistry, err := s.registry.GetNamespace(namespace.Name(namespaceRecord1.Namespace.Info.Name))
s.Nil(ns1FromRegistry)
s.Error(err)
var notFound *serviceerror.NotFound
var notFound *serviceerror.NamespaceNotFound
s.ErrorAs(err, &notFound)
}

Expand Down
6 changes: 3 additions & 3 deletions common/namespace/replicationTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceCreationReplicatio
if resp.Namespace.Info.Id != task.GetId() {
return ErrNameUUIDCollision
}
case *serviceerror.NotFound:
case *serviceerror.NamespaceNotFound:
// no check is necessary
recordExists = false
default:
Expand All @@ -177,7 +177,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceCreationReplicatio
if resp.Namespace.Info.Name != task.Info.GetName() {
return ErrNameUUIDCollision
}
case *serviceerror.NotFound:
case *serviceerror.NamespaceNotFound:
// no check is necessary
recordExists = false
default:
Expand Down Expand Up @@ -219,7 +219,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceUpdateReplicationT
Name: task.Info.GetName(),
})
if err != nil {
if _, ok := err.(*serviceerror.NotFound); ok {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound {
// this can happen if the create namespace replication task is to processed.
// e.g. new cluster which does not have anything
return h.handleNamespaceCreationReplicationTask(ctx, task)
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/cassandra/metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (m *MetadataStore) GetNamespace(
if len(ID) > 0 {
identity = ID
}
return serviceerror.NewNotFound(fmt.Sprintf("Namespace %s does not exist.", identity))
return serviceerror.NewNamespaceNotFound(identity)
}
return serviceerror.NewUnavailable(fmt.Sprintf("GetNamespace operation failed. Error %v", err))
}
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/cassandra/mutable_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ func (d *MutableStateStore) assertNotCurrentExecution(
NamespaceID: namespaceID,
WorkflowID: workflowID,
}); err != nil {
if _, ok := err.(*serviceerror.NotFound); ok {
if _, isNotFound := err.(*serviceerror.NotFound); isNotFound {
// allow bypassing no current record
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions common/persistence/clusterMetadataStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence/serialization"
Expand Down Expand Up @@ -59,7 +60,7 @@ type (

var _ ClusterMetadataManager = (*clusterMetadataManagerImpl)(nil)

//NewClusterMetadataManagerImpl returns new ClusterMetadataManager
// NewClusterMetadataManagerImpl returns new ClusterMetadataManager
func NewClusterMetadataManagerImpl(
persistence ClusterMetadataStore,
serializer serialization.Serializer,
Expand Down Expand Up @@ -183,7 +184,7 @@ func (m *clusterMetadataManagerImpl) SaveClusterMetadata(
}

oldClusterMetadata, err := m.GetClusterMetadata(ctx, &GetClusterMetadataRequest{ClusterName: request.GetClusterName()})
if _, notFound := err.(*serviceerror.NotFound); notFound {
if _, isNotFound := err.(*serviceerror.NotFound); isNotFound {
return m.persistence.SaveClusterMetadata(ctx, &InternalSaveClusterMetadataRequest{
ClusterName: request.ClusterName,
ClusterMetadata: mcm,
Expand Down
14 changes: 5 additions & 9 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,8 +1574,7 @@ func (s *ExecutionManagerSuite) TestDeleteCurrentWorkflow() {
runID0, err1 = s.GetCurrentWorkflowRunID(s.ctx, namespaceID, workflowExecution.GetWorkflowId())
s.Error(err1)
s.Empty(runID0)
_, ok := err1.(*serviceerror.NotFound)
s.True(ok)
s.IsType(&serviceerror.NotFound{}, err1)

// execution record should still be there
_, err2 = s.GetWorkflowMutableState(s.ctx, namespaceID, workflowExecution)
Expand Down Expand Up @@ -1623,12 +1622,11 @@ func (s *ExecutionManagerSuite) TestUpdateDeleteWorkflow() {
runID0, err1 = s.GetCurrentWorkflowRunID(s.ctx, namespaceID, workflowExecution.GetWorkflowId())
s.Error(err1)
s.Empty(runID0)
_, ok := err1.(*serviceerror.NotFound)
s.IsType(&serviceerror.NotFound{}, err1)
// execution record should still be there
_, err2 = s.GetWorkflowMutableState(s.ctx, namespaceID, workflowExecution)
s.Error(err2)
_, ok = err2.(*serviceerror.NotFound)
s.True(ok)
s.IsType(&serviceerror.NotFound{}, err2)
}

// TestCleanupCorruptedWorkflow test
Expand Down Expand Up @@ -1656,8 +1654,7 @@ func (s *ExecutionManagerSuite) TestCleanupCorruptedWorkflow() {
runID0, err4 := s.GetCurrentWorkflowRunID(s.ctx, namespaceID, workflowExecution.GetWorkflowId())
s.Error(err4)
s.Empty(runID0)
_, ok := err4.(*serviceerror.NotFound)
s.True(ok)
s.IsType(&serviceerror.NotFound{}, err4)

// we should still be able to load with runID
info1, err5 := s.GetWorkflowMutableState(s.ctx, namespaceID, workflowExecution)
Expand Down Expand Up @@ -1697,8 +1694,7 @@ func (s *ExecutionManagerSuite) TestCleanupCorruptedWorkflow() {
// execution record should be gone
_, err9 := s.GetWorkflowMutableState(s.ctx, namespaceID, workflowExecution)
s.Error(err9)
_, ok = err9.(*serviceerror.NotFound)
s.True(ok)
s.IsType(&serviceerror.NotFound{}, err9)
}

// TestGetCurrentWorkflow test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (m *MetadataPersistenceSuiteV2) TestGetNamespace() {
resp0, err0 := m.GetNamespace("", "does-not-exist")
m.Nil(resp0)
m.Error(err0)
m.IsType(&serviceerror.NotFound{}, err0)
m.IsType(&serviceerror.NamespaceNotFound{}, err0)
testBinaries := &namespacepb.BadBinaries{
Binaries: map[string]*namespacepb.BadBinaryInfo{
"abc": {
Expand Down Expand Up @@ -935,12 +935,12 @@ func (m *MetadataPersistenceSuiteV2) TestDeleteNamespace() {
time.Sleep(time.Second * time.Duration(i))
}
m.Error(err4)
m.IsType(&serviceerror.NotFound{}, err4)
m.IsType(&serviceerror.NamespaceNotFound{}, err4)
m.Nil(resp4)

resp5, err5 := m.GetNamespace(id, "")
m.Error(err5)
m.IsType(&serviceerror.NotFound{}, err5)
m.IsType(&serviceerror.NamespaceNotFound{}, err5)
m.Nil(resp5)

id = uuid.New()
Expand Down Expand Up @@ -976,12 +976,12 @@ func (m *MetadataPersistenceSuiteV2) TestDeleteNamespace() {

resp8, err8 := m.GetNamespace("", name)
m.Error(err8)
m.IsType(&serviceerror.NotFound{}, err8)
m.IsType(&serviceerror.NamespaceNotFound{}, err8)
m.Nil(resp8)

resp9, err9 := m.GetNamespace(id, "")
m.Error(err9)
m.IsType(&serviceerror.NotFound{}, err9)
m.IsType(&serviceerror.NamespaceNotFound{}, err9)
m.Nil(resp9)
}

Expand Down
2 changes: 1 addition & 1 deletion common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,7 @@ func (p *metricEmitter) updateErrorMetric(scope int, err error) {
p.metricClient.IncCounter(scope, metrics.PersistenceErrBadRequestCounter)
case *serviceerror.NamespaceAlreadyExists:
p.metricClient.IncCounter(scope, metrics.PersistenceErrNamespaceAlreadyExistsCounter)
case *serviceerror.NotFound:
case *serviceerror.NotFound, *serviceerror.NamespaceNotFound:
p.metricClient.IncCounter(scope, metrics.PersistenceErrEntityNotExistsCounter)
case *serviceerror.ResourceExhausted:
p.metricClient.IncCounter(scope, metrics.PersistenceErrBusyCounter)
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (m *sqlMetadataManagerV2) GetNamespace(
identity = request.ID
}

return nil, serviceerror.NewNotFound(fmt.Sprintf("Namespace %s does not exist.", identity))
return nil, serviceerror.NewNamespaceNotFound(identity)
default:
return nil, serviceerror.NewUnavailable(fmt.Sprintf("GetNamespace operation failed. Error %v", err))
}
Expand Down

0 comments on commit 6035304

Please sign in to comment.