diff --git a/common/persistence/cassandra/mutable_state_task_store.go b/common/persistence/cassandra/mutable_state_task_store.go index 61187c73e24..b2ee58068e4 100644 --- a/common/persistence/cassandra/mutable_state_task_store.go +++ b/common/persistence/cassandra/mutable_state_task_store.go @@ -27,6 +27,7 @@ package cassandra import ( "context" "fmt" + "time" "go.temporal.io/api/serviceerror" @@ -68,7 +69,7 @@ const ( `and visibility_ts = ? ` + `and task_id = ? ` - templateGetHistoryImmediateTasksQuery = `SELECT task_data, task_encoding ` + + templateGetHistoryImmediateTasksQuery = `SELECT task_id, task_data, task_encoding ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ? ` + @@ -79,7 +80,7 @@ const ( `and task_id >= ? ` + `and task_id < ?` - templateGetHistoryScheduledTasksQuery = `SELECT task_data, task_encoding ` + + templateGetHistoryScheduledTasksQuery = `SELECT visibility_ts, task_id, task_data, task_encoding ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ?` + @@ -99,7 +100,7 @@ const ( `and visibility_ts = ? ` + `and task_id = ? ` - templateGetTransferTasksQuery = `SELECT transfer, transfer_encoding ` + + templateGetTransferTasksQuery = `SELECT task_id, transfer, transfer_encoding ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ? ` + @@ -120,7 +121,7 @@ const ( `and visibility_ts = ? ` + `and task_id = ? ` - templateGetVisibilityTasksQuery = `SELECT visibility_task_data, visibility_task_encoding ` + + templateGetVisibilityTasksQuery = `SELECT task_id, visibility_task_data, visibility_task_encoding ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ? ` + @@ -141,7 +142,7 @@ const ( `and visibility_ts = ? ` + `and task_id = ? ` - templateGetReplicationTasksQuery = `SELECT replication, replication_encoding ` + + templateGetReplicationTasksQuery = `SELECT task_id, replication, replication_encoding ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ? ` + @@ -195,7 +196,7 @@ const ( `and visibility_ts = ? ` + `and task_id = ? ` - templateGetTimerTasksQuery = `SELECT timer, timer_encoding ` + + templateGetTimerTasksQuery = `SELECT visibility_ts, task_id, timer, timer_encoding ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ?` + @@ -383,7 +384,12 @@ func (d *MutableStateTaskStore) getTransferTask( if err := query.Scan(&data, &encoding); err != nil { return nil, gocql.ConvertError("GetTransferTask", err) } - return &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil + return &p.InternalGetHistoryTaskResponse{ + InternalHistoryTask: p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(taskID), + Blob: *p.NewDataBlob(data, encoding), + }, + }, nil } func (d *MutableStateTaskStore) getTransferTasks( @@ -405,12 +411,17 @@ func (d *MutableStateTaskStore) getTransferTasks( iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter() response := &p.InternalGetHistoryTasksResponse{} + var taskID int64 var data []byte var encoding string - for iter.Scan(&data, &encoding) { - response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding)) + for iter.Scan(&taskID, &data, &encoding) { + response.Tasks = append(response.Tasks, p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(taskID), + Blob: *p.NewDataBlob(data, encoding), + }) + taskID = 0 data = nil encoding = "" } @@ -468,7 +479,7 @@ func (d *MutableStateTaskStore) getTimerTask( ) (*p.InternalGetHistoryTaskResponse, error) { shardID := request.ShardID taskID := request.TaskKey.TaskID - visibilityTs := request.TaskKey.FireTime // TODO: do we need to convert the timestamp? + visibilityTs := p.UnixMilliseconds(request.TaskKey.FireTime) query := d.Session.Query(templateGetTimerTaskQuery, shardID, rowTypeTimerTask, @@ -485,7 +496,12 @@ func (d *MutableStateTaskStore) getTimerTask( return nil, gocql.ConvertError("GetTimerTask", err) } - return &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil + return &p.InternalGetHistoryTaskResponse{ + InternalHistoryTask: p.InternalHistoryTask{ + Key: tasks.NewKey(time.Unix(0, visibilityTs), taskID), + Blob: *p.NewDataBlob(data, encoding), + }, + }, nil } func (d *MutableStateTaskStore) getTimerTasks( @@ -507,12 +523,19 @@ func (d *MutableStateTaskStore) getTimerTasks( iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter() response := &p.InternalGetHistoryTasksResponse{} + var timestamp time.Time + var taskID int64 var data []byte var encoding string - for iter.Scan(&data, &encoding) { - response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding)) + for iter.Scan(×tamp, &taskID, &data, &encoding) { + response.Tasks = append(response.Tasks, p.InternalHistoryTask{ + Key: tasks.NewKey(timestamp, taskID), + Blob: *p.NewDataBlob(data, encoding), + }) + timestamp = time.Time{} + taskID = 0 data = nil encoding = "" } @@ -588,7 +611,12 @@ func (d *MutableStateTaskStore) getReplicationTask( return nil, gocql.ConvertError("GetReplicationTask", err) } - return &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil + return &p.InternalGetHistoryTaskResponse{ + InternalHistoryTask: p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(taskID), + Blob: *p.NewDataBlob(data, encoding), + }, + }, nil } func (d *MutableStateTaskStore) getReplicationTasks( @@ -758,7 +786,12 @@ func (d *MutableStateTaskStore) getVisibilityTask( if err := query.Scan(&data, &encoding); err != nil { return nil, gocql.ConvertError("GetVisibilityTask", err) } - return &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil + return &p.InternalGetHistoryTaskResponse{ + InternalHistoryTask: p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(taskID), + Blob: *p.NewDataBlob(data, encoding), + }, + }, nil } func (d *MutableStateTaskStore) getVisibilityTasks( @@ -780,12 +813,17 @@ func (d *MutableStateTaskStore) getVisibilityTasks( iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter() response := &p.InternalGetHistoryTasksResponse{} + var taskID int64 var data []byte var encoding string - for iter.Scan(&data, &encoding) { - response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding)) + for iter.Scan(&taskID, &data, &encoding) { + response.Tasks = append(response.Tasks, p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(taskID), + Blob: *p.NewDataBlob(data, encoding), + }) + taskID = 0 data = nil encoding = "" } @@ -844,12 +882,17 @@ func (d *MutableStateTaskStore) populateGetReplicationTasksResponse( iter := query.Iter() response := &p.InternalGetHistoryTasksResponse{} + var taskID int64 var data []byte var encoding string - for iter.Scan(&data, &encoding) { - response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding)) + for iter.Scan(&taskID, &data, &encoding) { + response.Tasks = append(response.Tasks, p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(taskID), + Blob: *p.NewDataBlob(data, encoding), + }) + taskID = 0 data = nil encoding = "" } @@ -889,51 +932,114 @@ func (d *MutableStateTaskStore) getHistoryTask( if err := query.Scan(&data, &encoding); err != nil { return nil, gocql.ConvertError("GetHistoryTask", err) } - return &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil + + task := p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(taskID), + Blob: *p.NewDataBlob(data, encoding), + } + if request.TaskCategory.Type() == tasks.CategoryTypeScheduled { + task.Key.FireTime = time.Unix(0, ts) + } + return &p.InternalGetHistoryTaskResponse{ + InternalHistoryTask: task, + }, nil } func (d *MutableStateTaskStore) getHistoryTasks( ctx context.Context, request *p.GetHistoryTasksRequest, +) (*p.InternalGetHistoryTasksResponse, error) { + switch request.TaskCategory.Type() { + case tasks.CategoryTypeImmediate: + return d.getHistoryImmedidateTasks(ctx, request) + case tasks.CategoryTypeScheduled: + return d.getHistoryScheduledTasks(ctx, request) + default: + panic(fmt.Sprintf("Unknown task category type: %v", request.TaskCategory.Type().String())) + } +} + +func (d *MutableStateTaskStore) getHistoryImmedidateTasks( + ctx context.Context, + request *p.GetHistoryTasksRequest, ) (*p.InternalGetHistoryTasksResponse, error) { // execution manager should already validated the request // Reading history tasks need to be quorum level consistent, otherwise we could lose task - var query gocql.Query - if request.TaskCategory.Type() == tasks.CategoryTypeImmediate { - query = d.Session.Query(templateGetHistoryImmediateTasksQuery, - request.ShardID, - request.TaskCategory.ID(), - rowTypeHistoryTaskNamespaceID, - rowTypeHistoryTaskWorkflowID, - rowTypeHistoryTaskRunID, - defaultVisibilityTimestamp, - request.InclusiveMinTaskKey.TaskID, - request.ExclusiveMaxTaskKey.TaskID, - ).WithContext(ctx) - } else { - minTimestamp := p.UnixMilliseconds(request.InclusiveMinTaskKey.FireTime) - maxTimestamp := p.UnixMilliseconds(request.ExclusiveMaxTaskKey.FireTime) - query = d.Session.Query(templateGetHistoryScheduledTasksQuery, - request.ShardID, - request.TaskCategory.ID(), - rowTypeHistoryTaskNamespaceID, - rowTypeHistoryTaskWorkflowID, - rowTypeHistoryTaskRunID, - minTimestamp, - maxTimestamp, - ).WithContext(ctx) + query := d.Session.Query(templateGetHistoryImmediateTasksQuery, + request.ShardID, + request.TaskCategory.ID(), + rowTypeHistoryTaskNamespaceID, + rowTypeHistoryTaskWorkflowID, + rowTypeHistoryTaskRunID, + defaultVisibilityTimestamp, + request.InclusiveMinTaskKey.TaskID, + request.ExclusiveMaxTaskKey.TaskID, + ).WithContext(ctx) + + iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter() + + response := &p.InternalGetHistoryTasksResponse{} + var taskID int64 + var data []byte + var encoding string + + for iter.Scan(&taskID, &data, &encoding) { + response.Tasks = append(response.Tasks, p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(taskID), + Blob: *p.NewDataBlob(data, encoding), + }) + + taskID = 0 + data = nil + encoding = "" + } + if len(iter.PageState()) > 0 { + response.NextPageToken = iter.PageState() } + if err := iter.Close(); err != nil { + return nil, gocql.ConvertError("GetHistoryImmediateTasks", err) + } + + return response, nil +} + +func (d *MutableStateTaskStore) getHistoryScheduledTasks( + ctx context.Context, + request *p.GetHistoryTasksRequest, +) (*p.InternalGetHistoryTasksResponse, error) { + // execution manager should already validated the request + // Reading history tasks need to be quorum level consistent, otherwise we could lose task + + minTimestamp := p.UnixMilliseconds(request.InclusiveMinTaskKey.FireTime) + maxTimestamp := p.UnixMilliseconds(request.ExclusiveMaxTaskKey.FireTime) + query := d.Session.Query(templateGetHistoryScheduledTasksQuery, + request.ShardID, + request.TaskCategory.ID(), + rowTypeHistoryTaskNamespaceID, + rowTypeHistoryTaskWorkflowID, + rowTypeHistoryTaskRunID, + minTimestamp, + maxTimestamp, + ).WithContext(ctx) + iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter() response := &p.InternalGetHistoryTasksResponse{} + var timestamp time.Time + var taskID int64 var data []byte var encoding string - for iter.Scan(&data, &encoding) { - response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding)) + for iter.Scan(×tamp, &taskID, &data, &encoding) { + response.Tasks = append(response.Tasks, p.InternalHistoryTask{ + Key: tasks.NewKey(timestamp, taskID), + Blob: *p.NewDataBlob(data, encoding), + }) + timestamp = time.Time{} + taskID = 0 data = nil encoding = "" } @@ -942,7 +1048,7 @@ func (d *MutableStateTaskStore) getHistoryTasks( } if err := iter.Close(); err != nil { - return nil, gocql.ConvertError("GetHistoryTasks", err) + return nil, gocql.ConvertError("GetHistoryScheduledTasks", err) } return response, nil diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 691c90c4dc3..a155b8271d5 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -107,6 +107,8 @@ const ( const numItemsInGarbageInfo = 3 +const ScheduledTaskMinPrecision = time.Millisecond + type ( // InvalidPersistenceRequestError represents invalid request to persistence InvalidPersistenceRequestError struct { diff --git a/common/persistence/execution_manager.go b/common/persistence/execution_manager.go index 83477d73622..6b7229554fc 100644 --- a/common/persistence/execution_manager.go +++ b/common/persistence/execution_manager.go @@ -744,10 +744,14 @@ func (m *executionManagerImpl) GetHistoryTask( return nil, err } - task, err := m.serializer.DeserializeTask(request.TaskCategory, resp.Task) + task, err := m.serializer.DeserializeTask(request.TaskCategory, resp.Blob) if err != nil { return nil, err } + if resp.Key.FireTime != tasks.DefaultFireTime { + task.SetVisibilityTime(resp.Key.FireTime) + } + task.SetTaskID(resp.Key.TaskID) return &GetHistoryTaskResponse{ Task: task, }, nil @@ -770,17 +774,23 @@ func (m *executionManagerImpl) GetHistoryTasks( return nil, err } - tasks := make([]tasks.Task, 0, len(resp.Tasks)) - for _, blob := range resp.Tasks { - task, err := m.serializer.DeserializeTask(request.TaskCategory, blob) + historyTasks := make([]tasks.Task, 0, len(resp.Tasks)) + for _, internalTask := range resp.Tasks { + task, err := m.serializer.DeserializeTask(request.TaskCategory, internalTask.Blob) if err != nil { return nil, err } - tasks = append(tasks, task) + + if internalTask.Key.FireTime != tasks.DefaultFireTime { + task.SetVisibilityTime(internalTask.Key.FireTime) + } + task.SetTaskID(internalTask.Key.TaskID) + + historyTasks = append(historyTasks, task) } return &GetHistoryTasksResponse{ - Tasks: tasks, + Tasks: historyTasks, NextPageToken: resp.NextPageToken, }, nil } @@ -824,17 +834,23 @@ func (m *executionManagerImpl) GetReplicationTasksFromDLQ( } category := tasks.CategoryReplication - tasks := make([]tasks.Task, 0, len(resp.Tasks)) - for _, blob := range resp.Tasks { - task, err := m.serializer.DeserializeTask(category, blob) + dlqTasks := make([]tasks.Task, 0, len(resp.Tasks)) + for _, internalTask := range resp.Tasks { + task, err := m.serializer.DeserializeTask(category, internalTask.Blob) if err != nil { return nil, err } - tasks = append(tasks, task) + + if internalTask.Key.FireTime != tasks.DefaultFireTime { + task.SetVisibilityTime(internalTask.Key.FireTime) + } + task.SetTaskID(internalTask.Key.TaskID) + + dlqTasks = append(dlqTasks, task) } return &GetHistoryTasksResponse{ - Tasks: tasks, + Tasks: dlqTasks, NextPageToken: resp.NextPageToken, }, nil } diff --git a/common/persistence/persistence-tests/cassandra_test.go b/common/persistence/persistence-tests/cassandra_test.go index aa25819fb0f..0488d05b76b 100644 --- a/common/persistence/persistence-tests/cassandra_test.go +++ b/common/persistence/persistence-tests/cassandra_test.go @@ -44,14 +44,14 @@ func TestCassandraMetadataPersistenceV2(t *testing.T) { suite.Run(t, s) } -func TestQueuePersistence(t *testing.T) { +func TestCassandraQueuePersistence(t *testing.T) { s := new(QueuePersistenceSuite) s.TestBase = NewTestBaseWithCassandra(&TestBaseOptions{}) s.TestBase.Setup(nil) suite.Run(t, s) } -func TestClusterMetadataPersistence(t *testing.T) { +func TestCassandraClusterMetadataPersistence(t *testing.T) { s := new(ClusterMetadataManagerSuite) s.TestBase = NewTestBaseWithCassandra(&TestBaseOptions{}) s.TestBase.Setup(nil) diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 43eea9a515c..4a8aed19485 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -488,11 +488,11 @@ type ( } InternalGetHistoryTaskResponse struct { - Task commonpb.DataBlob + InternalHistoryTask } InternalGetHistoryTasksResponse struct { - Tasks []commonpb.DataBlob + Tasks []InternalHistoryTask NextPageToken []byte } diff --git a/common/persistence/sql/execution_tasks.go b/common/persistence/sql/execution_tasks.go index 5b79930672a..c1047039798 100644 --- a/common/persistence/sql/execution_tasks.go +++ b/common/persistence/sql/execution_tasks.go @@ -32,7 +32,6 @@ import ( "math" "time" - commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" p "go.temporal.io/server/common/persistence" @@ -155,7 +154,10 @@ func (m *sqlExecutionStore) getHistoryImmediateTask( immedidateTaskRow := rows[0] resp := &p.InternalGetHistoryTaskResponse{ - Task: *p.NewDataBlob(immedidateTaskRow.Data, immedidateTaskRow.DataEncoding), + InternalHistoryTask: p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(immedidateTaskRow.TaskID), + Blob: *p.NewDataBlob(immedidateTaskRow.Data, immedidateTaskRow.DataEncoding), + }, } return resp, nil } @@ -197,14 +199,17 @@ func (m *sqlExecutionStore) getHistoryImmediateTasks( } } resp := &p.InternalGetHistoryTasksResponse{ - Tasks: make([]commonpb.DataBlob, len(rows)), + Tasks: make([]p.InternalHistoryTask, len(rows)), } if len(rows) == 0 { return resp, nil } for i, row := range rows { - resp.Tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding) + resp.Tasks[i] = p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(row.TaskID), + Blob: *p.NewDataBlob(row.Data, row.DataEncoding), + } } if len(rows) == request.BatchSize { resp.NextPageToken = getImmediateTaskNextPageToken( @@ -312,7 +317,10 @@ func (m *sqlExecutionStore) getHistoryScheduledTask( scheduledTaskRow := rows[0] resp := &p.InternalGetHistoryTaskResponse{ - Task: *p.NewDataBlob(scheduledTaskRow.Data, scheduledTaskRow.DataEncoding), + InternalHistoryTask: p.InternalHistoryTask{ + Key: tasks.NewKey(scheduledTaskRow.VisibilityTimestamp, scheduledTaskRow.TaskID), + Blob: *p.NewDataBlob(scheduledTaskRow.Data, scheduledTaskRow.DataEncoding), + }, } return resp, nil } @@ -353,9 +361,12 @@ func (m *sqlExecutionStore) getHistoryScheduledTasks( ) } - resp := &p.InternalGetHistoryTasksResponse{Tasks: make([]commonpb.DataBlob, len(rows))} - for i, row := range rows { - resp.Tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding) + resp := &p.InternalGetHistoryTasksResponse{Tasks: make([]p.InternalHistoryTask, 0, len(rows))} + for _, row := range rows { + resp.Tasks = append(resp.Tasks, p.InternalHistoryTask{ + Key: tasks.NewKey(row.VisibilityTimestamp, row.TaskID), + Blob: *p.NewDataBlob(row.Data, row.DataEncoding), + }) } if len(resp.Tasks) == request.BatchSize { @@ -442,8 +453,12 @@ func (m *sqlExecutionStore) getTransferTask( transferRow := rows[0] resp := &p.InternalGetHistoryTaskResponse{ - Task: *p.NewDataBlob(transferRow.Data, transferRow.DataEncoding), + InternalHistoryTask: p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(transferRow.TaskID), + Blob: *p.NewDataBlob(transferRow.Data, transferRow.DataEncoding), + }, } + return resp, nil } @@ -468,14 +483,17 @@ func (m *sqlExecutionStore) getTransferTasks( } } resp := &p.InternalGetHistoryTasksResponse{ - Tasks: make([]commonpb.DataBlob, len(rows)), + Tasks: make([]p.InternalHistoryTask, len(rows)), } if len(rows) == 0 { return resp, nil } for i, row := range rows { - resp.Tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding) + resp.Tasks[i] = p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(row.TaskID), + Blob: *p.NewDataBlob(row.Data, row.DataEncoding), + } } if len(rows) == request.BatchSize { resp.NextPageToken = getImmediateTaskNextPageToken( @@ -536,8 +554,12 @@ func (m *sqlExecutionStore) getTimerTask( timerRow := rows[0] resp := &p.InternalGetHistoryTaskResponse{ - Task: *p.NewDataBlob(timerRow.Data, timerRow.DataEncoding), + InternalHistoryTask: p.InternalHistoryTask{ + Key: tasks.NewKey(timerRow.VisibilityTimestamp, timerRow.TaskID), + Blob: *p.NewDataBlob(timerRow.Data, timerRow.DataEncoding), + }, } + return resp, nil } @@ -564,9 +586,12 @@ func (m *sqlExecutionStore) getTimerTasks( return nil, serviceerror.NewUnavailable(fmt.Sprintf("GetTimerTasks operation failed. Select failed. Error: %v", err)) } - resp := &p.InternalGetHistoryTasksResponse{Tasks: make([]commonpb.DataBlob, len(rows))} - for i, row := range rows { - resp.Tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding) + resp := &p.InternalGetHistoryTasksResponse{Tasks: make([]p.InternalHistoryTask, 0, len(rows))} + for _, row := range rows { + resp.Tasks = append(resp.Tasks, p.InternalHistoryTask{ + Key: tasks.NewKey(row.VisibilityTimestamp, row.TaskID), + Blob: *p.NewDataBlob(row.Data, row.DataEncoding), + }) } if len(resp.Tasks) == request.BatchSize { @@ -634,7 +659,12 @@ func (m *sqlExecutionStore) getReplicationTask( } replicationRow := rows[0] - resp := &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(replicationRow.Data, replicationRow.DataEncoding)} + resp := &p.InternalGetHistoryTaskResponse{ + InternalHistoryTask: p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(replicationRow.TaskID), + Blob: *p.NewDataBlob(replicationRow.Data, replicationRow.DataEncoding), + }, + } return resp, nil } @@ -698,9 +728,12 @@ func (m *sqlExecutionStore) populateGetReplicationTasksResponse( return &p.InternalGetHistoryTasksResponse{}, nil } - var tasks = make([]commonpb.DataBlob, len(rows)) + var replicationTasks = make([]p.InternalHistoryTask, len(rows)) for i, row := range rows { - tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding) + replicationTasks[i] = p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(row.TaskID), + Blob: *p.NewDataBlob(row.Data, row.DataEncoding), + } } var nextPageToken []byte if len(rows) == batchSize { @@ -710,7 +743,7 @@ func (m *sqlExecutionStore) populateGetReplicationTasksResponse( ) } return &p.InternalGetHistoryTasksResponse{ - Tasks: tasks, + Tasks: replicationTasks, NextPageToken: nextPageToken, }, nil } @@ -724,9 +757,12 @@ func (m *sqlExecutionStore) populateGetReplicationDLQTasksResponse( return &p.InternalGetHistoryTasksResponse{}, nil } - var tasks = make([]commonpb.DataBlob, len(rows)) + var dlqTasks = make([]p.InternalHistoryTask, len(rows)) for i, row := range rows { - tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding) + dlqTasks[i] = p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(row.TaskID), + Blob: *p.NewDataBlob(row.Data, row.DataEncoding), + } } var nextPageToken []byte if len(rows) == batchSize { @@ -736,7 +772,7 @@ func (m *sqlExecutionStore) populateGetReplicationDLQTasksResponse( ) } return &p.InternalGetHistoryTasksResponse{ - Tasks: tasks, + Tasks: dlqTasks, NextPageToken: nextPageToken, }, nil } @@ -872,7 +908,12 @@ func (m *sqlExecutionStore) getVisibilityTask( } visibilityRow := rows[0] - resp := &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(visibilityRow.Data, visibilityRow.DataEncoding)} + resp := &p.InternalGetHistoryTaskResponse{ + InternalHistoryTask: p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(visibilityRow.TaskID), + Blob: *p.NewDataBlob(visibilityRow.Data, visibilityRow.DataEncoding), + }, + } return resp, nil } @@ -897,14 +938,17 @@ func (m *sqlExecutionStore) getVisibilityTasks( } } resp := &p.InternalGetHistoryTasksResponse{ - Tasks: make([]commonpb.DataBlob, len(rows)), + Tasks: make([]p.InternalHistoryTask, len(rows)), } if len(rows) == 0 { return resp, nil } for i, row := range rows { - resp.Tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding) + resp.Tasks[i] = p.InternalHistoryTask{ + Key: tasks.NewImmediateKey(row.TaskID), + Blob: *p.NewDataBlob(row.Data, row.DataEncoding), + } } if len(rows) == request.BatchSize { resp.NextPageToken = getImmediateTaskNextPageToken( diff --git a/common/persistence/tests/execution_mutable_state_task.go b/common/persistence/tests/execution_mutable_state_task.go index 6cd34894ac8..b551d08e0de 100644 --- a/common/persistence/tests/execution_mutable_state_task.go +++ b/common/persistence/tests/execution_mutable_state_task.go @@ -331,6 +331,112 @@ func (s *ExecutionMutableStateTaskSuite) TestAddGetVisibilityTasks_Multiple() { s.Equal(visibilityTasks, loadedTasks) } +func (s *ExecutionMutableStateTaskSuite) TestGetTimerTasksOrdered() { + now := time.Now().Truncate(p.ScheduledTaskMinPrecision) + timerTasks := []tasks.Task{ + &tasks.UserTimerTask{ + WorkflowKey: s.WorkflowKey, + TaskID: 100, + VisibilityTimestamp: now.Add(time.Nanosecond * 10), + }, + &tasks.UserTimerTask{ + WorkflowKey: s.WorkflowKey, + TaskID: 50, + VisibilityTimestamp: now.Add(time.Nanosecond * 20), + }, + } + + err := s.ExecutionManager.AddHistoryTasks(s.Ctx, &p.AddHistoryTasksRequest{ + ShardID: s.ShardID, + RangeID: s.RangeID, + NamespaceID: s.WorkflowKey.NamespaceID, + WorkflowID: s.WorkflowKey.WorkflowID, + RunID: s.WorkflowKey.RunID, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTimer: timerTasks, + }, + }) + s.NoError(err) + + // due to persistence layer precision loss, + // two tasks can be returned in either order, + // but must be ordered in terms of tasks.Key + response, err := s.ExecutionManager.GetHistoryTasks(s.Ctx, &p.GetHistoryTasksRequest{ + ShardID: s.ShardID, + TaskCategory: tasks.CategoryTimer, + InclusiveMinTaskKey: tasks.NewKey(now, 0), + ExclusiveMaxTaskKey: tasks.NewKey(now.Add(time.Second), 0), + BatchSize: 10, + }) + s.NoError(err) + s.Len(response.Tasks, 2) + s.Empty(response.NextPageToken) + s.True(response.Tasks[0].GetKey().CompareTo(response.Tasks[1].GetKey()) < 0) +} + +func (s *ExecutionMutableStateTaskSuite) TestGetScheduledTasksOrdered() { + now := time.Now().Truncate(p.ScheduledTaskMinPrecision) + scheduledTasks := []tasks.Task{ + tasks.NewFakeTask( + s.WorkflowKey, + fakeScheduledTaskCategory, + now.Add(time.Nanosecond*10), + ), + tasks.NewFakeTask( + s.WorkflowKey, + fakeScheduledTaskCategory, + now.Add(time.Nanosecond*20), + ), + } + scheduledTasks[0].SetTaskID(100) + scheduledTasks[1].SetTaskID(50) + + err := s.ExecutionManager.AddHistoryTasks(s.Ctx, &p.AddHistoryTasksRequest{ + ShardID: s.ShardID, + RangeID: s.RangeID, + NamespaceID: s.WorkflowKey.NamespaceID, + WorkflowID: s.WorkflowKey.WorkflowID, + RunID: s.WorkflowKey.RunID, + Tasks: map[tasks.Category][]tasks.Task{ + fakeScheduledTaskCategory: scheduledTasks, + }, + }) + s.NoError(err) + + // due to persistence layer precision loss, + // two tasks can be returned in either order, + // but must be ordered in terms of tasks.Key + response, err := s.ExecutionManager.GetHistoryTasks(s.Ctx, &p.GetHistoryTasksRequest{ + ShardID: s.ShardID, + TaskCategory: fakeScheduledTaskCategory, + InclusiveMinTaskKey: tasks.NewKey(now, 0), + ExclusiveMaxTaskKey: tasks.NewKey(now.Add(time.Second), 0), + BatchSize: 10, + }) + s.NoError(err) + s.Len(response.Tasks, 2) + s.Empty(response.NextPageToken) + s.True(response.Tasks[0].GetKey().CompareTo(response.Tasks[1].GetKey()) < 0) + + err = s.ExecutionManager.RangeCompleteHistoryTasks(s.Ctx, &p.RangeCompleteHistoryTasksRequest{ + ShardID: s.ShardID, + TaskCategory: fakeScheduledTaskCategory, + InclusiveMinTaskKey: tasks.NewKey(now, 0), + ExclusiveMaxTaskKey: tasks.NewKey(now.Add(time.Second), 0), + }) + s.NoError(err) + + response, err = s.ExecutionManager.GetHistoryTasks(s.Ctx, &p.GetHistoryTasksRequest{ + ShardID: s.ShardID, + TaskCategory: fakeScheduledTaskCategory, + InclusiveMinTaskKey: tasks.NewKey(now, 0), + ExclusiveMaxTaskKey: tasks.NewKey(now.Add(time.Second), 0), + BatchSize: 10, + }) + s.NoError(err) + s.Empty(response.Tasks) +} + func (s *ExecutionMutableStateTaskSuite) AddRandomTasks( category tasks.Category, numTasks int, @@ -340,6 +446,7 @@ func (s *ExecutionMutableStateTaskSuite) AddRandomTasks( now := time.Now().UTC() randomTasks := make([]tasks.Task, 0, numTasks) for i := 0; i != numTasks; i++ { + now = now.Truncate(p.ScheduledTaskMinPrecision) randomTasks = append(randomTasks, newTaskFn(s.WorkflowKey, currentTaskID, now)) currentTaskID += rand.Int63n(100) + 1 now = now.Add(time.Duration(rand.Int63n(1000_000_000)) + time.Millisecond) diff --git a/service/history/queues/queue_scheduled.go b/service/history/queues/queue_scheduled.go index 8299d2a9c2d..4bf485074c7 100644 --- a/service/history/queues/queue_scheduled.go +++ b/service/history/queues/queue_scheduled.go @@ -59,8 +59,6 @@ type ( ) const ( - scheduledTaskPrecision = time.Millisecond - lookAheadRateLimitDelay = 3 * time.Second ) @@ -84,7 +82,7 @@ func NewScheduledQueue( ShardID: shard.GetShardID(), TaskCategory: category, InclusiveMinTaskKey: tasks.NewKey(r.InclusiveMin.FireTime, 0), - ExclusiveMaxTaskKey: tasks.NewKey(r.ExclusiveMax.FireTime.Add(scheduledTaskPrecision), 0), + ExclusiveMaxTaskKey: tasks.NewKey(r.ExclusiveMax.FireTime.Add(persistence.ScheduledTaskMinPrecision), 0), BatchSize: options.BatchSize(), NextPageToken: paginationToken, } @@ -94,17 +92,6 @@ func NewScheduledQueue( return nil, nil, err } - // The rest of the code assumes task loaded is ordered by task key, which has precision of ns for time. - // However for cassandra impl, the task returned is ordered by visibilitystamp column which only has - // ms precision, which makes tasks out of order, even across multiple loads. - // So truncate task key time also to ms precision to make them ordered. - // - // This however, moves task visibility time forward for 1ms and may cause timer tasks to be skipped - // during processing. To compensate for that, add 1ms back when scheduling the task in reader.go. - for _, task := range resp.Tasks { - task.SetVisibilityTime(task.GetVisibilityTime().Truncate(scheduledTaskPrecision)) - } - for len(resp.Tasks) > 0 && !r.ContainsKey(resp.Tasks[0].GetKey()) { resp.Tasks = resp.Tasks[1:] } @@ -312,7 +299,7 @@ func IsTimeExpired( referenceTime time.Time, testingTime time.Time, ) bool { - referenceTime = referenceTime.Truncate(scheduledTaskPrecision) - testingTime = testingTime.Truncate(scheduledTaskPrecision) + referenceTime = referenceTime.Truncate(persistence.ScheduledTaskMinPrecision) + testingTime = testingTime.Truncate(persistence.ScheduledTaskMinPrecision) return !testingTime.After(referenceTime) } diff --git a/service/history/queues/queue_scheduled_test.go b/service/history/queues/queue_scheduled_test.go index ad6c370d7bb..aa6369e40f4 100644 --- a/service/history/queues/queue_scheduled_test.go +++ b/service/history/queues/queue_scheduled_test.go @@ -139,8 +139,6 @@ func (s *scheduledQueueSuite) TestPaginationFnProvider() { for _, key := range testTaskKeys { mockTask := tasks.NewMockTask(s.controller) mockTask.EXPECT().GetKey().Return(key).AnyTimes() - mockTask.EXPECT().GetVisibilityTime().Return(key.FireTime).Times(1) - mockTask.EXPECT().SetVisibilityTime(key.FireTime.Truncate(scheduledTaskPrecision)).Times(1) mockTasks = append(mockTasks, mockTask) if r.ContainsKey(key) { @@ -155,7 +153,7 @@ func (s *scheduledQueueSuite) TestPaginationFnProvider() { ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryTimer, InclusiveMinTaskKey: tasks.NewKey(r.InclusiveMin.FireTime, 0), - ExclusiveMaxTaskKey: tasks.NewKey(r.ExclusiveMax.FireTime.Add(scheduledTaskPrecision), 0), + ExclusiveMaxTaskKey: tasks.NewKey(r.ExclusiveMax.FireTime.Add(persistence.ScheduledTaskMinPrecision), 0), BatchSize: testQueueOptions.BatchSize(), NextPageToken: currentPageToken, }).Return(&persistence.GetHistoryTasksResponse{ diff --git a/service/history/queues/reader.go b/service/history/queues/reader.go index 9aa07b2132c..583e2b0b11e 100644 --- a/service/history/queues/reader.go +++ b/service/history/queues/reader.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/quotas" ) @@ -484,8 +485,9 @@ func (r *ReaderImpl) submit( executable Executable, ) { now := r.timeSource.Now() - // Please check the comment in queue_scheduled.go for why adding 1ms to the fire time. - if fireTime := executable.GetKey().FireTime.Add(scheduledTaskPrecision); now.Before(fireTime) { + // Persistence layer may lose precision when persisting the task, which essentially move + // task fire time forward. Need to account for that when submitting the task. + if fireTime := executable.GetKey().FireTime.Add(persistence.ScheduledTaskMinPrecision); now.Before(fireTime) { r.rescheduler.Add(executable, fireTime) return } diff --git a/service/history/queues/reader_test.go b/service/history/queues/reader_test.go index 0365c879e59..c6138faad6a 100644 --- a/service/history/queues/reader_test.go +++ b/service/history/queues/reader_test.go @@ -40,6 +40,7 @@ import ( "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/predicates" "go.temporal.io/server/service/history/tasks" ) @@ -436,7 +437,7 @@ func (s *readerSuite) TestSubmitTask() { futureFireTime := reader.timeSource.Now().Add(time.Minute) mockExecutable.EXPECT().GetKey().Return(tasks.NewKey(futureFireTime, rand.Int63())).Times(1) - s.mockRescheduler.EXPECT().Add(mockExecutable, futureFireTime.Add(scheduledTaskPrecision)).Times(1) + s.mockRescheduler.EXPECT().Add(mockExecutable, futureFireTime.Add(persistence.ScheduledTaskMinPrecision)).Times(1) reader.submit(mockExecutable) } diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 85c8778ad13..d69afd7e7bb 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -315,7 +315,10 @@ func (s *ContextImpl) UpdateScheduledQueueExclusiveHighReadWatermark( currentTime = s.getOrUpdateRemoteClusterInfoLocked(cluster).CurrentTime } - newMaxReadLevel := currentTime.Add(s.config.TimerProcessorMaxTimeShift()).Truncate(time.Millisecond) + // Truncation here is just to make sure max read level has the same precision as the old logic + // in case existing code can't work correctly with precision higher than 1ms. + // Once we validate the rest of the code can worker correctly with higher precision, the truncation should be removed. + newMaxReadLevel := currentTime.Add(s.config.TimerProcessorMaxTimeShift()).Truncate(persistence.ScheduledTaskMinPrecision) if singleProcessorMode { // When generating scheduled tasks, the task's timestamp will be compared to the namespace's active cluster's // maxReadLevel to avoid generatnig a task before maxReadLevel. @@ -1356,7 +1359,9 @@ func (s *ContextImpl) allocateTaskIDAndTimestampLocked( currentCluster = namespaceEntry.ActiveClusterName() } readCursorTS := s.scheduledTaskMaxReadLevelMap[currentCluster] - if ts.Before(readCursorTS) { + if ts.Truncate(persistence.ScheduledTaskMinPrecision).Before(readCursorTS) { + // make sure scheduled task timestamp is higher than max read level after truncation + // as persistence layer may lose precision when persisting the task. // This can happen if shard move and new host have a time SKU, or there is db write delay. // We generate a new timer ID using timerMaxReadLevel. s.contextTaggedLogger.Debug("New timer generated is less than read level", @@ -1365,7 +1370,7 @@ func (s *ContextImpl) allocateTaskIDAndTimestampLocked( tag.Timestamp(ts), tag.CursorTimestamp(readCursorTS), tag.ValueShardAllocateTimerBeforeRead) - task.SetVisibilityTime(s.scheduledTaskMaxReadLevelMap[currentCluster].Add(time.Millisecond)) + task.SetVisibilityTime(s.scheduledTaskMaxReadLevelMap[currentCluster].Add(persistence.ScheduledTaskMinPrecision)) } visibilityTs := task.GetVisibilityTime() @@ -1782,7 +1787,12 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error { maxReadTime = util.MaxTime(maxReadTime, timestamp.TimeValue(queueState.ExclusiveReaderHighWatermark.FireTime)) } - scheduledTaskMaxReadLevelMap[clusterName] = maxReadTime.Truncate(time.Millisecond) + // we only need to make sure max read level >= persisted ack level/exclusiveReaderHighWatermark + // Add().Truncate() here is just to make sure max read level has the same precision as the old logic + // in case existing code can't work correctly with precision higher than 1ms. + // Once we validate the rest of the code can worker correctly with higher precision, the code should simply be + // scheduledTaskMaxReadLevelMap[clusterName] = maxReadTime + scheduledTaskMaxReadLevelMap[clusterName] = maxReadTime.Add(persistence.ScheduledTaskMinPrecision).Truncate(persistence.ScheduledTaskMinPrecision) if clusterName != currentClusterName { remoteClusterInfos[clusterName] = &remoteClusterInfo{CurrentTime: maxReadTime} diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index 2ebfbb4c505..2a65347eae8 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -362,7 +362,15 @@ func (t *timerQueueProcessorBase) verifyReschedulerSize() bool { func (t *timerQueueProcessorBase) submitTask( executable queues.Executable, ) { - executable.SetScheduledTime(t.timeSource.Now()) + now := t.timeSource.Now() + // Persistence layer may lose precision when persisting the task, which essentially move + // task fire time forward. Need to account for that when submitting the task. + if fireTime := executable.GetKey().FireTime.Add(persistence.ScheduledTaskMinPrecision); now.Before(fireTime) { + t.rescheduler.Add(executable, fireTime) + return + } + + executable.SetScheduledTime(now) if !t.scheduler.TrySubmit(executable) { executable.Reschedule() }