Skip to content

Commit

Permalink
Use namespace Id for replication APIs (#2914)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed May 27, 2022
1 parent a7e3520 commit 3dbabab
Show file tree
Hide file tree
Showing 20 changed files with 626 additions and 289 deletions.
550 changes: 361 additions & 189 deletions api/adminservice/v1/request_response.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions api/adminservice/v1/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

152 changes: 105 additions & 47 deletions api/token/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions common/rpc/interceptor/namespace_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/grpc"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -160,6 +161,31 @@ func (ni *NamespaceValidatorInterceptor) extractNamespaceFromRequest(req interfa
return nil, ErrNamespaceNotSet
}
return nil, nil
// TODO (alex): remove 3 below cases in 1.18+ together with `namespace` field in corresponding protos.
case *adminservice.GetWorkflowExecutionRawHistoryV2Request:
if request.GetNamespaceId() != "" {
return ni.namespaceRegistry.GetNamespaceByID(namespace.ID(request.GetNamespaceId()))
}
if namespaceName.IsEmpty() {
return nil, ErrNamespaceNotSet
}
return ni.namespaceRegistry.GetNamespace(namespaceName)
case *adminservice.ReapplyEventsRequest:
if request.GetNamespaceId() != "" {
return ni.namespaceRegistry.GetNamespaceByID(namespace.ID(request.GetNamespaceId()))
}
if namespaceName.IsEmpty() {
return nil, ErrNamespaceNotSet
}
return ni.namespaceRegistry.GetNamespace(namespaceName)
case *adminservice.RefreshWorkflowTasksRequest:
if request.GetNamespaceId() != "" {
return ni.namespaceRegistry.GetNamespaceByID(namespace.ID(request.GetNamespaceId()))
}
if namespaceName.IsEmpty() {
return nil, ErrNamespaceNotSet
}
return ni.namespaceRegistry.GetNamespace(namespaceName)
default:
// All other APIs.
if namespaceName.IsEmpty() {
Expand Down
7 changes: 3 additions & 4 deletions common/xdc/nDCHistoryResender.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,10 @@ func (n *NDCHistoryResenderImpl) getHistory(

logger := log.With(n.logger, tag.WorkflowRunID(runID))

namespaceEntry, err := n.namespaceRegistry.GetNamespaceByID(namespaceID)
ns, err := n.namespaceRegistry.GetNamespaceByID(namespaceID)
if err != nil {
logger.Error("error getting namespace", tag.Error(err))
return nil, err
}
namespace := namespaceEntry.Name()

ctx, cancel := rpc.NewContextFromParentWithTimeoutAndHeaders(ctx, resendContextTimeout)
defer cancel()
Expand All @@ -281,7 +279,8 @@ func (n *NDCHistoryResenderImpl) getHistory(
}

response, err := adminClient.GetWorkflowExecutionRawHistoryV2(ctx, &adminservice.GetWorkflowExecutionRawHistoryV2Request{
Namespace: namespace.String(),
Namespace: ns.Name().String(),
NamespaceId: namespaceID.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
Expand Down
9 changes: 6 additions & 3 deletions common/xdc/nDCHistoryResender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ func (s *nDCHistoryResenderSuite) TestSendSingleWorkflowHistory() {
s.mockAdminClient.EXPECT().GetWorkflowExecutionRawHistoryV2(
gomock.Any(),
&adminservice.GetWorkflowExecutionRawHistoryV2Request{
Namespace: s.namespace.String(),
Namespace: s.namespace.String(),
NamespaceId: s.namespaceID.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
Expand All @@ -191,7 +192,8 @@ func (s *nDCHistoryResenderSuite) TestSendSingleWorkflowHistory() {
s.mockAdminClient.EXPECT().GetWorkflowExecutionRawHistoryV2(
gomock.Any(),
&adminservice.GetWorkflowExecutionRawHistoryV2Request{
Namespace: s.namespace.String(),
Namespace: s.namespace.String(),
NamespaceId: s.namespaceID.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
Expand Down Expand Up @@ -345,7 +347,8 @@ func (s *nDCHistoryResenderSuite) TestGetHistory() {
NextPageToken: nextTokenOut,
}
s.mockAdminClient.EXPECT().GetWorkflowExecutionRawHistoryV2(gomock.Any(), &adminservice.GetWorkflowExecutionRawHistoryV2Request{
Namespace: s.namespace.String(),
Namespace: s.namespace.String(),
NamespaceId: s.namespaceID.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
Expand Down

0 comments on commit 3dbabab

Please sign in to comment.