Skip to content

Commit

Permalink
Reduce unnecessary data type conversion between mutable state & persi…
Browse files Browse the repository at this point in the history
…stence (#1409)

* Remove unnecessary conversion between slice and map for the following type:
  * activity
  * timer
  * child workflow
  * request cancel
  * signal
  • Loading branch information
wxing1292 committed Mar 26, 2021
1 parent 5ab080a commit 9176647
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 394 deletions.
43 changes: 43 additions & 0 deletions common/convert/convert.go
Expand Up @@ -88,3 +88,46 @@ func Int32ToString(v int32) string {
func Uint16ToString(v uint16) string {
return strconv.FormatUint(uint64(v), 10)
}

func Int64SetToSlice(
inputs map[int64]struct{},
) []int64 {
outputs := make([]int64, len(inputs))
i := 0
for item := range inputs {
outputs[i] = item
i++
}
return outputs
}

func Int64SliceToSet(
inputs []int64,
) map[int64]struct{} {
outputs := make(map[int64]struct{}, len(inputs))
for _, item := range inputs {
outputs[item] = struct{}{}
}
return outputs
}

func StringSetToSlice(
inputs map[string]struct{},
) []string {
outputs := make([]string, len(inputs))
i := 0
for item := range inputs {
outputs[i] = item
i++
}
return outputs
}
func StringSliceToSet(
inputs []string,
) map[string]struct{} {
outputs := make(map[string]struct{}, len(inputs))
for _, item := range inputs {
outputs[item] = struct{}{}
}
return outputs
}
63 changes: 32 additions & 31 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Expand Up @@ -35,6 +35,7 @@ import (
enumsspb "go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/convert"
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"go.temporal.io/server/common/persistence/serialization"
Expand Down Expand Up @@ -951,8 +952,8 @@ func createOrUpdateCurrentExecution(

func updateActivityInfos(
batch gocql.Batch,
activityInfos []*persistencespb.ActivityInfo,
deleteIDs []int64,
activityInfos map[int64]*persistencespb.ActivityInfo,
deleteIDs map[int64]struct{},
shardID int32,
namespaceID string,
workflowID string,
Expand All @@ -978,7 +979,7 @@ func updateActivityInfos(
rowTypeExecutionTaskID)
}

for _, deleteID := range deleteIDs {
for deleteID := range deleteIDs {
batch.Query(templateDeleteActivityInfoQuery,
deleteID,
shardID,
Expand Down Expand Up @@ -1013,7 +1014,7 @@ func deleteBufferedEvents(

func resetActivityInfos(
batch gocql.Batch,
activityInfos []*persistencespb.ActivityInfo,
activityInfos map[int64]*persistencespb.ActivityInfo,
shardID int32,
namespaceID string,
workflowID string,
Expand All @@ -1040,8 +1041,8 @@ func resetActivityInfos(

func updateTimerInfos(
batch gocql.Batch,
timerInfos []*persistencespb.TimerInfo,
deleteInfos []string,
timerInfos map[string]*persistencespb.TimerInfo,
deleteInfos map[string]struct{},
shardID int32,
namespaceID string,
workflowID string,
Expand All @@ -1066,7 +1067,7 @@ func updateTimerInfos(
rowTypeExecutionTaskID)
}

for _, deleteInfoID := range deleteInfos {
for deleteInfoID := range deleteInfos {
batch.Query(templateDeleteTimerInfoQuery,
deleteInfoID,
shardID,
Expand All @@ -1083,7 +1084,7 @@ func updateTimerInfos(

func resetTimerInfos(
batch gocql.Batch,
timerInfos []*persistencespb.TimerInfo,
timerInfos map[string]*persistencespb.TimerInfo,
shardID int32,
namespaceID string,
workflowID string,
Expand Down Expand Up @@ -1111,8 +1112,8 @@ func resetTimerInfos(

func updateChildExecutionInfos(
batch gocql.Batch,
childExecutionInfos []*persistencespb.ChildExecutionInfo,
deleteIDs []int64,
childExecutionInfos map[int64]*persistencespb.ChildExecutionInfo,
deleteIDs map[int64]struct{},
shardID int32,
namespaceID string,
workflowID string,
Expand All @@ -1138,7 +1139,7 @@ func updateChildExecutionInfos(
rowTypeExecutionTaskID)
}

for _, deleteID := range deleteIDs {
for deleteID := range deleteIDs {
batch.Query(templateDeleteChildExecutionInfoQuery,
deleteID,
shardID,
Expand All @@ -1154,7 +1155,7 @@ func updateChildExecutionInfos(

func resetChildExecutionInfos(
batch gocql.Batch,
childExecutionInfos []*persistencespb.ChildExecutionInfo,
childExecutionInfos map[int64]*persistencespb.ChildExecutionInfo,
shardID int32,
namespaceID string,
workflowID string,
Expand All @@ -1180,8 +1181,8 @@ func resetChildExecutionInfos(

func updateRequestCancelInfos(
batch gocql.Batch,
requestCancelInfos []*persistencespb.RequestCancelInfo,
deleteIDs []int64,
requestCancelInfos map[int64]*persistencespb.RequestCancelInfo,
deleteIDs map[int64]struct{},
shardID int32,
namespaceID string,
workflowID string,
Expand All @@ -1207,7 +1208,7 @@ func updateRequestCancelInfos(
rowTypeExecutionTaskID)
}

for _, deleteID := range deleteIDs {
for deleteID := range deleteIDs {
batch.Query(templateDeleteRequestCancelInfoQuery,
deleteID,
shardID,
Expand All @@ -1223,7 +1224,7 @@ func updateRequestCancelInfos(

func resetRequestCancelInfos(
batch gocql.Batch,
requestCancelInfos []*persistencespb.RequestCancelInfo,
requestCancelInfos map[int64]*persistencespb.RequestCancelInfo,
shardID int32,
namespaceID string,
workflowID string,
Expand Down Expand Up @@ -1252,8 +1253,8 @@ func resetRequestCancelInfos(

func updateSignalInfos(
batch gocql.Batch,
signalInfos []*persistencespb.SignalInfo,
deleteIDs []int64,
signalInfos map[int64]*persistencespb.SignalInfo,
deleteIDs map[int64]struct{},
shardID int32,
namespaceID string,
workflowID string,
Expand All @@ -1279,7 +1280,7 @@ func updateSignalInfos(
rowTypeExecutionTaskID)
}

for _, deleteID := range deleteIDs {
for deleteID := range deleteIDs {
batch.Query(templateDeleteSignalInfoQuery,
deleteID,
shardID,
Expand All @@ -1295,7 +1296,7 @@ func updateSignalInfos(

func resetSignalInfos(
batch gocql.Batch,
signalInfos []*persistencespb.SignalInfo,
signalInfos map[int64]*persistencespb.SignalInfo,
shardID int32,
namespaceID string,
workflowID string,
Expand Down Expand Up @@ -1323,8 +1324,8 @@ func resetSignalInfos(

func updateSignalsRequested(
batch gocql.Batch,
signalReqIDs []string,
deleteSignalReqIDs []string,
signalReqIDs map[string]struct{},
deleteSignalReqIDs map[string]struct{},
shardID int32,
namespaceID string,
workflowID string,
Expand All @@ -1333,7 +1334,7 @@ func updateSignalsRequested(

if len(signalReqIDs) > 0 {
batch.Query(templateUpdateSignalRequestedQuery,
signalReqIDs,
convert.StringSetToSlice(signalReqIDs),
shardID,
rowTypeExecution,
namespaceID,
Expand All @@ -1345,7 +1346,7 @@ func updateSignalsRequested(

if len(deleteSignalReqIDs) > 0 {
batch.Query(templateDeleteWorkflowExecutionSignalRequestedQuery,
deleteSignalReqIDs,
convert.StringSetToSlice(deleteSignalReqIDs),
shardID,
rowTypeExecution,
namespaceID,
Expand All @@ -1358,15 +1359,15 @@ func updateSignalsRequested(

func resetSignalRequested(
batch gocql.Batch,
signalRequested []string,
signalRequested map[string]struct{},
shardID int32,
namespaceID string,
workflowID string,
runID string,
) {

batch.Query(templateResetSignalRequestedQuery,
signalRequested,
convert.StringSetToSlice(signalRequested),
shardID,
rowTypeExecution,
namespaceID,
Expand Down Expand Up @@ -1414,7 +1415,7 @@ func updateBufferedEvents(
}

func resetActivityInfoMap(
activityInfos []*persistencespb.ActivityInfo,
activityInfos map[int64]*persistencespb.ActivityInfo,
) (map[int64][]byte, enumspb.EncodingType, error) {

encoding := enumspb.ENCODING_TYPE_UNSPECIFIED
Expand All @@ -1433,7 +1434,7 @@ func resetActivityInfoMap(
}

func resetTimerInfoMap(
timerInfos []*persistencespb.TimerInfo,
timerInfos map[string]*persistencespb.TimerInfo,
) (map[string][]byte, enumspb.EncodingType, error) {

tMap := make(map[string][]byte)
Expand All @@ -1454,7 +1455,7 @@ func resetTimerInfoMap(
}

func resetChildExecutionInfoMap(
childExecutionInfos []*persistencespb.ChildExecutionInfo,
childExecutionInfos map[int64]*persistencespb.ChildExecutionInfo,
) (map[int64][]byte, enumspb.EncodingType, error) {

cMap := make(map[int64][]byte)
Expand All @@ -1472,7 +1473,7 @@ func resetChildExecutionInfoMap(
}

func resetRequestCancelInfoMap(
requestCancelInfos []*persistencespb.RequestCancelInfo,
requestCancelInfos map[int64]*persistencespb.RequestCancelInfo,
) (map[int64][]byte, enumspb.EncodingType, error) {

rcMap := make(map[int64][]byte)
Expand All @@ -1493,7 +1494,7 @@ func resetRequestCancelInfoMap(
}

func resetSignalInfoMap(
signalInfos []*persistencespb.SignalInfo,
signalInfos map[int64]*persistencespb.SignalInfo,
) (map[int64][]byte, enumspb.EncodingType, error) {

sMap := make(map[int64][]byte)
Expand Down
36 changes: 18 additions & 18 deletions common/persistence/dataInterfaces.go
Expand Up @@ -539,18 +539,18 @@ type (
ExecutionState *persistencespb.WorkflowExecutionState
NextEventID int64

UpsertActivityInfos []*persistencespb.ActivityInfo
DeleteActivityInfos []int64
UpsertTimerInfos []*persistencespb.TimerInfo
DeleteTimerInfos []string
UpsertChildExecutionInfos []*persistencespb.ChildExecutionInfo
DeleteChildExecutionInfos []int64
UpsertRequestCancelInfos []*persistencespb.RequestCancelInfo
DeleteRequestCancelInfos []int64
UpsertSignalInfos []*persistencespb.SignalInfo
DeleteSignalInfos []int64
UpsertSignalRequestedIDs []string
DeleteSignalRequestedIDs []string
UpsertActivityInfos map[int64]*persistencespb.ActivityInfo
DeleteActivityInfos map[int64]struct{}
UpsertTimerInfos map[string]*persistencespb.TimerInfo
DeleteTimerInfos map[string]struct{}
UpsertChildExecutionInfos map[int64]*persistencespb.ChildExecutionInfo
DeleteChildExecutionInfos map[int64]struct{}
UpsertRequestCancelInfos map[int64]*persistencespb.RequestCancelInfo
DeleteRequestCancelInfos map[int64]struct{}
UpsertSignalInfos map[int64]*persistencespb.SignalInfo
DeleteSignalInfos map[int64]struct{}
UpsertSignalRequestedIDs map[string]struct{}
DeleteSignalRequestedIDs map[string]struct{}
NewBufferedEvents []*historypb.HistoryEvent
ClearBufferedEvents bool

Expand All @@ -569,12 +569,12 @@ type (
ExecutionState *persistencespb.WorkflowExecutionState
NextEventID int64

ActivityInfos []*persistencespb.ActivityInfo
TimerInfos []*persistencespb.TimerInfo
ChildExecutionInfos []*persistencespb.ChildExecutionInfo
RequestCancelInfos []*persistencespb.RequestCancelInfo
SignalInfos []*persistencespb.SignalInfo
SignalRequestedIDs []string
ActivityInfos map[int64]*persistencespb.ActivityInfo
TimerInfos map[string]*persistencespb.TimerInfo
ChildExecutionInfos map[int64]*persistencespb.ChildExecutionInfo
RequestCancelInfos map[int64]*persistencespb.RequestCancelInfo
SignalInfos map[int64]*persistencespb.SignalInfo
SignalRequestedIDs map[string]struct{}

TransferTasks []Task
ReplicationTasks []Task
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/persistence-tests/executionManagerTest.go
Expand Up @@ -829,8 +829,8 @@ func (s *ExecutionManagerSuite) TestUpsertWorkflowActivity() {
NextEventID: info.NextEventId,
Condition: nextEventID,
Checksum: csum,
UpsertActivityInfos: []*persistencespb.ActivityInfo{
{
UpsertActivityInfos: map[int64]*persistencespb.ActivityInfo{
100: {
Version: 0,
ScheduleId: 100,
TaskQueue: "test-activity-tasktlist-1",
Expand All @@ -855,8 +855,8 @@ func (s *ExecutionManagerSuite) TestUpsertWorkflowActivity() {
NextEventID: info.NextEventId,
Condition: nextEventID,
Checksum: csum,
UpsertActivityInfos: []*persistencespb.ActivityInfo{
{
UpsertActivityInfos: map[int64]*persistencespb.ActivityInfo{
100: {
Version: 0,
ScheduleId: 100,
TaskQueue: "test-activity-tasktlist-2",
Expand Down

0 comments on commit 9176647

Please sign in to comment.