Skip to content

Commit

Permalink
Replace persistence call for ReadHistoryBranch with rpc (#4293)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberthu committed May 10, 2023
1 parent 51249b6 commit 3c49b56
Show file tree
Hide file tree
Showing 4 changed files with 569 additions and 38 deletions.
58 changes: 49 additions & 9 deletions service/frontend/adminHandler.go
Expand Up @@ -1696,7 +1696,7 @@ func (adh *AdminHandler) DeleteWorkflowExecution(
} else if executionInfo.GetCloseTime() != nil {
closeTime = executionInfo.GetCloseTime()
} else {
completionEvent, err := adh.getWorkflowCompletionEvent(ctx, shardID, resp.State)
completionEvent, err := adh.getWorkflowCompletionEvent(ctx, namespaceID, shardID, resp.State)
if err != nil {
warnMsg := "Unable to load workflow completion event, will skip deleting visibility record"
adh.logger.Warn(warnMsg, tag.Error(err))
Expand Down Expand Up @@ -1913,6 +1913,7 @@ func (adh *AdminHandler) startRequestProfile(operation string) (metrics.Handler,

func (adh *AdminHandler) getWorkflowCompletionEvent(
ctx context.Context,
namespaceID namespace.ID,
shardID int32,
mutableState *persistencespb.WorkflowMutableState,
) (*historypb.HistoryEvent, error) {
Expand All @@ -1928,19 +1929,21 @@ func (adh *AdminHandler) getWorkflowCompletionEvent(
return nil, err
}

resp, err := adh.persistenceExecutionManager.ReadHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
ShardID: shardID,
BranchToken: currentVersionHistory.GetBranchToken(),
MinEventID: executionInfo.CompletionEventBatchId,
MaxEventID: completionEventID + 1,
PageSize: 1,
})
historyEvents, err := adh.readHistoryBranch(
ctx,
namespaceID,
shardID,
currentVersionHistory.GetBranchToken(),
executionInfo.CompletionEventBatchId,
completionEventID+1,
1,
)
if err != nil {
return nil, err
}

// find history event from batch and return back single event to caller
for _, e := range resp.HistoryEvents {
for _, e := range historyEvents {
if e.EventId == completionEventID && e.Version == version {
return e, nil
}
Expand All @@ -1949,6 +1952,43 @@ func (adh *AdminHandler) getWorkflowCompletionEvent(
return nil, serviceerror.NewInternal("Unable to find closed event for workflow")
}

func (adh *AdminHandler) readHistoryBranch(
ctx context.Context,
namespaceID namespace.ID,
shardID int32,
branchToken []byte,
minEventId int64,
maxEventID int64,
pageSize int,
) ([]*historypb.HistoryEvent, error) {
if adh.config.readEventsFromHistory(adh.metricsHandler) {
resp, err := adh.historyClient.ReadHistoryBranch(ctx, &historyservice.ReadHistoryBranchRequest{
NamespaceId: namespaceID.String(),
ShardId: shardID,
BranchToken: branchToken,
MinEventId: minEventId,
MaxEventId: maxEventID,
PageSize: int64(pageSize),
})
if err != nil {
return nil, err
}
return resp.HistoryEvents, nil
} else {
resp, err := adh.persistenceExecutionManager.ReadHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
ShardID: shardID,
BranchToken: branchToken,
MinEventID: minEventId,
MaxEventID: maxEventID,
PageSize: pageSize,
})
if err != nil {
return nil, err
}
return resp.HistoryEvents, nil
}
}

func (adh *AdminHandler) StreamWorkflowReplicationMessages(
targetCluster adminservice.AdminService_StreamWorkflowReplicationMessagesServer,
) (retError error) {
Expand Down
107 changes: 106 additions & 1 deletion service/frontend/adminHandler_test.go
Expand Up @@ -1346,7 +1346,8 @@ func (s *adminHandlerSuite) TestDeleteWorkflowExecution_LoadMutableStateFailed()
s.NoError(err)
}

func (s *adminHandlerSuite) TestDeleteWorkflowExecution_CassandraVisibilityBackend() {
// TODO: remove deprecated unit test after all FE->History calls migrated from DB->RPC
func (s *adminHandlerSuite) TestDeleteWorkflowExecution_CassandraVisibilityBackend_Deprecated() {
execution := commonpb.WorkflowExecution{
WorkflowId: "workflowID",
RunId: uuid.New(),
Expand Down Expand Up @@ -1446,3 +1447,107 @@ func (s *adminHandlerSuite) TestDeleteWorkflowExecution_CassandraVisibilityBacke
_, err = s.handler.DeleteWorkflowExecution(context.Background(), request)
s.NoError(err)
}

func (s *adminHandlerSuite) TestDeleteWorkflowExecution_CassandraVisibilityBackend() {
s.handler.config.ReadEventsFromHistoryFraction = dynamicconfig.GetFloatPropertyFn(1.0)

execution := commonpb.WorkflowExecution{
WorkflowId: "workflowID",
RunId: uuid.New(),
}

request := &adminservice.DeleteWorkflowExecutionRequest{
Namespace: s.namespace.String(),
Execution: &execution,
}

s.mockNamespaceCache.EXPECT().GetNamespaceID(s.namespace).Return(s.namespaceID, nil).AnyTimes()
s.mockVisibilityMgr.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(true).AnyTimes()

// test delete open records
branchToken := []byte("branchToken")
version := int64(100)
mutableState := &persistencespb.WorkflowMutableState{
ExecutionState: &persistencespb.WorkflowExecutionState{
CreateRequestId: uuid.New(),
RunId: execution.RunId,
State: enums.WORKFLOW_EXECUTION_STATE_RUNNING,
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
},
NextEventId: 12,
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
CompletionEventBatchId: 10,
StartTime: timestamp.TimePtr(time.Now()),
VersionHistories: &historyspb.VersionHistories{
CurrentVersionHistoryIndex: 0,
Histories: []*historyspb.VersionHistory{
{
BranchToken: branchToken,
Items: []*historyspb.VersionHistoryItem{
{EventId: 11, Version: version},
},
},
},
},
},
}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: mutableState}, nil)
s.mockHistoryClient.EXPECT().DeleteWorkflowVisibilityRecord(gomock.Any(), &historyservice.DeleteWorkflowVisibilityRecordRequest{
NamespaceId: s.namespaceID.String(),
Execution: &execution,
WorkflowStartTime: mutableState.ExecutionInfo.StartTime,
}).Return(&historyservice.DeleteWorkflowVisibilityRecordResponse{}, nil)
s.mockExecutionMgr.EXPECT().DeleteCurrentWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil)
s.mockExecutionMgr.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil)
s.mockExecutionMgr.EXPECT().DeleteHistoryBranch(gomock.Any(), gomock.Any()).Times(len(mutableState.ExecutionInfo.VersionHistories.Histories))

_, err := s.handler.DeleteWorkflowExecution(context.Background(), request)
s.NoError(err)

// test delete close records
mutableState.ExecutionState.State = enums.WORKFLOW_EXECUTION_STATE_COMPLETED
mutableState.ExecutionState.Status = enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED

shardID := common.WorkflowIDToHistoryShard(
s.namespaceID.String(),
execution.GetWorkflowId(),
s.handler.numberOfHistoryShards,
)
closeTime := time.Now()
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: mutableState}, nil)
s.mockHistoryClient.EXPECT().ReadHistoryBranch(gomock.Any(), &historyservice.ReadHistoryBranchRequest{
NamespaceId: s.namespaceID.String(),
ShardId: shardID,
BranchToken: branchToken,
MinEventId: mutableState.ExecutionInfo.CompletionEventBatchId,
MaxEventId: mutableState.NextEventId,
PageSize: 1,
}).Return(&historyservice.ReadHistoryBranchResponse{
HistoryEvents: []*historypb.HistoryEvent{
{
EventId: 10,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED,
Version: version,
EventTime: timestamp.TimePtr(closeTime.Add(-time.Millisecond)),
},
{
EventId: 11,
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
Version: version,
EventTime: timestamp.TimePtr(closeTime),
},
},
}, nil)
s.mockHistoryClient.EXPECT().DeleteWorkflowVisibilityRecord(gomock.Any(), &historyservice.DeleteWorkflowVisibilityRecordRequest{
NamespaceId: s.namespaceID.String(),
Execution: &execution,
WorkflowCloseTime: timestamp.TimePtr(closeTime),
}).Return(&historyservice.DeleteWorkflowVisibilityRecordResponse{}, nil)
s.mockExecutionMgr.EXPECT().DeleteCurrentWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil)
s.mockExecutionMgr.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil)
s.mockExecutionMgr.EXPECT().DeleteHistoryBranch(gomock.Any(), gomock.Any()).Times(len(mutableState.ExecutionInfo.VersionHistories.Histories))

_, err = s.handler.DeleteWorkflowExecution(context.Background(), request)
s.NoError(err)
}
114 changes: 89 additions & 25 deletions service/frontend/workflow_handler.go
Expand Up @@ -4186,33 +4186,23 @@ func (wh *WorkflowHandler) getHistory(
branchToken []byte,
) (*historypb.History, []byte, error) {

var size int
isFirstPage := len(nextPageToken) == 0
shardID := common.WorkflowIDToHistoryShard(namespaceID.String(), execution.GetWorkflowId(), wh.config.NumHistoryShards)
var err error
var historyEvents []*historypb.HistoryEvent
historyEvents, size, nextPageToken, err = persistence.ReadFullPageEvents(ctx, wh.persistenceExecutionManager, &persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
MinEventID: firstEventID,
MaxEventID: nextEventID,
PageSize: int(pageSize),
NextPageToken: nextPageToken,
ShardID: shardID,
})
switch err.(type) {
case nil:
// noop
case *serviceerror.DataLoss:
// log event
wh.logger.Error("encountered data loss event", tag.WorkflowNamespaceID(namespaceID.String()), tag.WorkflowID(execution.GetWorkflowId()), tag.WorkflowRunID(execution.GetRunId()))
return nil, nil, err
default:
historyEvents, newPageToken, err := wh.readHistoryBranch(
ctx,
metricsHandler,
namespaceID,
execution,
firstEventID,
nextEventID,
pageSize,
nextPageToken,
branchToken,
)
if err != nil {
return nil, nil, err
}

metricsHandler.Histogram(metrics.HistorySize.GetMetricName(), metrics.HistorySize.GetMetricUnit()).Record(int64(size))

isLastPage := len(nextPageToken) == 0
isLastPage := len(newPageToken) == 0
if err := wh.verifyHistoryIsComplete(
historyEvents,
firstEventID,
Expand All @@ -4228,7 +4218,7 @@ func (wh *WorkflowHandler) getHistory(
tag.Error(err))
}

if len(nextPageToken) == 0 && transientWorkflowTaskInfo != nil {
if len(newPageToken) == 0 && transientWorkflowTaskInfo != nil {
if err := wh.validateTransientWorkflowTaskEvents(nextEventID, transientWorkflowTaskInfo); err != nil {
metricsHandler.Counter(metrics.ServiceErrIncompleteHistoryCounter.GetMetricName()).Record(1)
wh.logger.Error("getHistory error",
Expand All @@ -4248,7 +4238,81 @@ func (wh *WorkflowHandler) getHistory(
executionHistory := &historypb.History{
Events: historyEvents,
}
return executionHistory, nextPageToken, nil
return executionHistory, newPageToken, nil
}

func (wh *WorkflowHandler) readHistoryBranch(
ctx context.Context,
metricsHandler metrics.Handler,
namespaceID namespace.ID,
execution commonpb.WorkflowExecution,
firstEventID int64,
nextEventID int64,
pageSize int32,
nextPageToken []byte,
branchToken []byte,
) ([]*historypb.HistoryEvent, []byte, error) {
// readFullPageEvents reads a full page of history events from history service. Due to storage format of V2 History
// it is not guaranteed that pageSize amount of data is returned. Function returns the list of history events, the
// size of data read, the next page token, and an error if present.
readFullPageEvents := func(
req *historyservice.ReadHistoryBranchRequest,
) ([]*historypb.HistoryEvent, int, []byte, error) {
var historyEvents []*historypb.HistoryEvent
size := 0
for {
response, err := wh.historyClient.ReadHistoryBranch(ctx, req)
if err != nil {
return nil, 0, nil, err
}
historyEvents = append(historyEvents, response.HistoryEvents...)
size += int(response.Size_)
if len(historyEvents) >= int(req.PageSize) || len(response.NextPageToken) == 0 {
return historyEvents, size, response.NextPageToken, nil
}
req.NextPageToken = response.NextPageToken
}
}

var size int
shardID := common.WorkflowIDToHistoryShard(namespaceID.String(), execution.GetWorkflowId(), wh.config.NumHistoryShards)
var err error
var historyEvents []*historypb.HistoryEvent
var newPageToken []byte
if wh.config.readEventsFromHistory(wh.metricsScope(ctx)) {
historyEvents, size, newPageToken, err = readFullPageEvents(&historyservice.ReadHistoryBranchRequest{
NamespaceId: namespaceID.String(),
ShardId: shardID,
BranchToken: branchToken,
MinEventId: firstEventID,
MaxEventId: nextEventID,
PageSize: int64(pageSize),
NextPageToken: nextPageToken,
})
} else {
historyEvents, size, newPageToken, err = persistence.ReadFullPageEvents(ctx, wh.persistenceExecutionManager, &persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
MinEventID: firstEventID,
MaxEventID: nextEventID,
PageSize: int(pageSize),
NextPageToken: nextPageToken,
ShardID: shardID,
})
}

switch err.(type) {
case nil:
// noop
case *serviceerror.DataLoss:
// log event
wh.logger.Error("encountered data loss event", tag.WorkflowNamespaceID(namespaceID.String()), tag.WorkflowID(execution.GetWorkflowId()), tag.WorkflowRunID(execution.GetRunId()))
return nil, nil, err
default:
return nil, nil, err
}
metricsHandler.Histogram(metrics.HistorySize.GetMetricName(), metrics.HistorySize.GetMetricUnit()).Record(int64(size))

return historyEvents, newPageToken, err
}

func (wh *WorkflowHandler) getHistoryReverse(
Expand Down

0 comments on commit 3c49b56

Please sign in to comment.