Skip to content

Commit

Permalink
Improve mem efficiency when mutating workflow (#2706)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 committed Apr 13, 2022
1 parent ca4fa9e commit 25e2cd7
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,24 +469,29 @@ func (m *executionManagerImpl) SerializeWorkflowMutation( // unexport
WorkflowID: input.ExecutionInfo.GetWorkflowId(),
RunID: input.ExecutionState.GetRunId(),

UpsertActivityInfos: make(map[int64]*commonpb.DataBlob),
UpsertTimerInfos: make(map[string]*commonpb.DataBlob),
UpsertChildExecutionInfos: make(map[int64]*commonpb.DataBlob),
UpsertRequestCancelInfos: make(map[int64]*commonpb.DataBlob),
UpsertSignalInfos: make(map[int64]*commonpb.DataBlob),
UpsertActivityInfos: make(map[int64]*commonpb.DataBlob, len(input.UpsertActivityInfos)),
DeleteActivityInfos: input.DeleteActivityInfos,

ExecutionInfo: input.ExecutionInfo,
ExecutionState: input.ExecutionState,
UpsertTimerInfos: make(map[string]*commonpb.DataBlob, len(input.UpsertTimerInfos)),
DeleteTimerInfos: input.DeleteTimerInfos,

DeleteActivityInfos: input.DeleteActivityInfos,
DeleteTimerInfos: input.DeleteTimerInfos,
UpsertChildExecutionInfos: make(map[int64]*commonpb.DataBlob, len(input.UpsertChildExecutionInfos)),
DeleteChildExecutionInfos: input.DeleteChildExecutionInfos,
DeleteRequestCancelInfos: input.DeleteRequestCancelInfos,
DeleteSignalInfos: input.DeleteSignalInfos,
DeleteSignalRequestedIDs: input.DeleteSignalRequestedIDs,

UpsertRequestCancelInfos: make(map[int64]*commonpb.DataBlob, len(input.UpsertRequestCancelInfos)),
DeleteRequestCancelInfos: input.DeleteRequestCancelInfos,

UpsertSignalInfos: make(map[int64]*commonpb.DataBlob, len(input.UpsertSignalInfos)),
DeleteSignalInfos: input.DeleteSignalInfos,

UpsertSignalRequestedIDs: input.UpsertSignalRequestedIDs,
ClearBufferedEvents: input.ClearBufferedEvents,
DeleteSignalRequestedIDs: input.DeleteSignalRequestedIDs,

NewBufferedEvents: nil,
ClearBufferedEvents: input.ClearBufferedEvents,

ExecutionInfo: input.ExecutionInfo,
ExecutionState: input.ExecutionState,

Tasks: tasks,

Expand All @@ -511,27 +516,31 @@ func (m *executionManagerImpl) SerializeWorkflowMutation( // unexport
}
result.UpsertActivityInfos[key] = blob
}

for key, info := range input.UpsertTimerInfos {
blob, err := m.serializer.TimerInfoToBlob(info, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, err
}
result.UpsertTimerInfos[key] = blob
}

for key, info := range input.UpsertChildExecutionInfos {
blob, err := m.serializer.ChildExecutionInfoToBlob(info, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, err
}
result.UpsertChildExecutionInfos[key] = blob
}

for key, info := range input.UpsertRequestCancelInfos {
blob, err := m.serializer.RequestCancelInfoToBlob(info, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, err
}
result.UpsertRequestCancelInfos[key] = blob
}

for key, info := range input.UpsertSignalInfos {
blob, err := m.serializer.SignalInfoToBlob(info, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
Expand All @@ -546,6 +555,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation( // unexport
return nil, err
}
}

result.LastWriteVersion, err = getCurrentBranchLastWriteVersion(input.ExecutionInfo.VersionHistories)
if err != nil {
return nil, err
Expand All @@ -572,11 +582,11 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot( // unexport
WorkflowID: input.ExecutionInfo.GetWorkflowId(),
RunID: input.ExecutionState.GetRunId(),

ActivityInfos: make(map[int64]*commonpb.DataBlob),
TimerInfos: make(map[string]*commonpb.DataBlob),
ChildExecutionInfos: make(map[int64]*commonpb.DataBlob),
RequestCancelInfos: make(map[int64]*commonpb.DataBlob),
SignalInfos: make(map[int64]*commonpb.DataBlob),
ActivityInfos: make(map[int64]*commonpb.DataBlob, len(input.ActivityInfos)),
TimerInfos: make(map[string]*commonpb.DataBlob, len(input.TimerInfos)),
ChildExecutionInfos: make(map[int64]*commonpb.DataBlob, len(input.ChildExecutionInfos)),
RequestCancelInfos: make(map[int64]*commonpb.DataBlob, len(input.RequestCancelInfos)),
SignalInfos: make(map[int64]*commonpb.DataBlob, len(input.SignalInfos)),

ExecutionInfo: input.ExecutionInfo,
ExecutionState: input.ExecutionState,
Expand Down

0 comments on commit 25e2cd7

Please sign in to comment.