Skip to content

Commit

Permalink
Fix scheduled task precision (#3591)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Nov 17, 2022
1 parent be30bab commit ee1abe1
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 114 deletions.
200 changes: 153 additions & 47 deletions common/persistence/cassandra/mutable_state_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package cassandra
import (
"context"
"fmt"
"time"

"go.temporal.io/api/serviceerror"

Expand Down Expand Up @@ -68,7 +69,7 @@ const (
`and visibility_ts = ? ` +
`and task_id = ? `

templateGetHistoryImmediateTasksQuery = `SELECT task_data, task_encoding ` +
templateGetHistoryImmediateTasksQuery = `SELECT task_id, task_data, task_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand All @@ -79,7 +80,7 @@ const (
`and task_id >= ? ` +
`and task_id < ?`

templateGetHistoryScheduledTasksQuery = `SELECT task_data, task_encoding ` +
templateGetHistoryScheduledTasksQuery = `SELECT visibility_ts, task_id, task_data, task_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ?` +
Expand All @@ -99,7 +100,7 @@ const (
`and visibility_ts = ? ` +
`and task_id = ? `

templateGetTransferTasksQuery = `SELECT transfer, transfer_encoding ` +
templateGetTransferTasksQuery = `SELECT task_id, transfer, transfer_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand All @@ -120,7 +121,7 @@ const (
`and visibility_ts = ? ` +
`and task_id = ? `

templateGetVisibilityTasksQuery = `SELECT visibility_task_data, visibility_task_encoding ` +
templateGetVisibilityTasksQuery = `SELECT task_id, visibility_task_data, visibility_task_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand All @@ -141,7 +142,7 @@ const (
`and visibility_ts = ? ` +
`and task_id = ? `

templateGetReplicationTasksQuery = `SELECT replication, replication_encoding ` +
templateGetReplicationTasksQuery = `SELECT task_id, replication, replication_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -195,7 +196,7 @@ const (
`and visibility_ts = ? ` +
`and task_id = ? `

templateGetTimerTasksQuery = `SELECT timer, timer_encoding ` +
templateGetTimerTasksQuery = `SELECT visibility_ts, task_id, timer, timer_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ?` +
Expand Down Expand Up @@ -383,7 +384,12 @@ func (d *MutableStateTaskStore) getTransferTask(
if err := query.Scan(&data, &encoding); err != nil {
return nil, gocql.ConvertError("GetTransferTask", err)
}
return &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil
return &p.InternalGetHistoryTaskResponse{
InternalHistoryTask: p.InternalHistoryTask{
Key: tasks.NewImmediateKey(taskID),
Blob: *p.NewDataBlob(data, encoding),
},
}, nil
}

func (d *MutableStateTaskStore) getTransferTasks(
Expand All @@ -405,12 +411,17 @@ func (d *MutableStateTaskStore) getTransferTasks(
iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter()

response := &p.InternalGetHistoryTasksResponse{}
var taskID int64
var data []byte
var encoding string

for iter.Scan(&data, &encoding) {
response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding))
for iter.Scan(&taskID, &data, &encoding) {
response.Tasks = append(response.Tasks, p.InternalHistoryTask{
Key: tasks.NewImmediateKey(taskID),
Blob: *p.NewDataBlob(data, encoding),
})

taskID = 0
data = nil
encoding = ""
}
Expand Down Expand Up @@ -468,7 +479,7 @@ func (d *MutableStateTaskStore) getTimerTask(
) (*p.InternalGetHistoryTaskResponse, error) {
shardID := request.ShardID
taskID := request.TaskKey.TaskID
visibilityTs := request.TaskKey.FireTime // TODO: do we need to convert the timestamp?
visibilityTs := p.UnixMilliseconds(request.TaskKey.FireTime)
query := d.Session.Query(templateGetTimerTaskQuery,
shardID,
rowTypeTimerTask,
Expand All @@ -485,7 +496,12 @@ func (d *MutableStateTaskStore) getTimerTask(
return nil, gocql.ConvertError("GetTimerTask", err)
}

return &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil
return &p.InternalGetHistoryTaskResponse{
InternalHistoryTask: p.InternalHistoryTask{
Key: tasks.NewKey(time.Unix(0, visibilityTs), taskID),
Blob: *p.NewDataBlob(data, encoding),
},
}, nil
}

func (d *MutableStateTaskStore) getTimerTasks(
Expand All @@ -507,12 +523,19 @@ func (d *MutableStateTaskStore) getTimerTasks(
iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter()

response := &p.InternalGetHistoryTasksResponse{}
var timestamp time.Time
var taskID int64
var data []byte
var encoding string

for iter.Scan(&data, &encoding) {
response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding))
for iter.Scan(&timestamp, &taskID, &data, &encoding) {
response.Tasks = append(response.Tasks, p.InternalHistoryTask{
Key: tasks.NewKey(timestamp, taskID),
Blob: *p.NewDataBlob(data, encoding),
})

timestamp = time.Time{}
taskID = 0
data = nil
encoding = ""
}
Expand Down Expand Up @@ -588,7 +611,12 @@ func (d *MutableStateTaskStore) getReplicationTask(
return nil, gocql.ConvertError("GetReplicationTask", err)
}

return &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil
return &p.InternalGetHistoryTaskResponse{
InternalHistoryTask: p.InternalHistoryTask{
Key: tasks.NewImmediateKey(taskID),
Blob: *p.NewDataBlob(data, encoding),
},
}, nil
}

func (d *MutableStateTaskStore) getReplicationTasks(
Expand Down Expand Up @@ -758,7 +786,12 @@ func (d *MutableStateTaskStore) getVisibilityTask(
if err := query.Scan(&data, &encoding); err != nil {
return nil, gocql.ConvertError("GetVisibilityTask", err)
}
return &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil
return &p.InternalGetHistoryTaskResponse{
InternalHistoryTask: p.InternalHistoryTask{
Key: tasks.NewImmediateKey(taskID),
Blob: *p.NewDataBlob(data, encoding),
},
}, nil
}

func (d *MutableStateTaskStore) getVisibilityTasks(
Expand All @@ -780,12 +813,17 @@ func (d *MutableStateTaskStore) getVisibilityTasks(
iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter()

response := &p.InternalGetHistoryTasksResponse{}
var taskID int64
var data []byte
var encoding string

for iter.Scan(&data, &encoding) {
response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding))
for iter.Scan(&taskID, &data, &encoding) {
response.Tasks = append(response.Tasks, p.InternalHistoryTask{
Key: tasks.NewImmediateKey(taskID),
Blob: *p.NewDataBlob(data, encoding),
})

taskID = 0
data = nil
encoding = ""
}
Expand Down Expand Up @@ -844,12 +882,17 @@ func (d *MutableStateTaskStore) populateGetReplicationTasksResponse(
iter := query.Iter()

response := &p.InternalGetHistoryTasksResponse{}
var taskID int64
var data []byte
var encoding string

for iter.Scan(&data, &encoding) {
response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding))
for iter.Scan(&taskID, &data, &encoding) {
response.Tasks = append(response.Tasks, p.InternalHistoryTask{
Key: tasks.NewImmediateKey(taskID),
Blob: *p.NewDataBlob(data, encoding),
})

taskID = 0
data = nil
encoding = ""
}
Expand Down Expand Up @@ -889,51 +932,114 @@ func (d *MutableStateTaskStore) getHistoryTask(
if err := query.Scan(&data, &encoding); err != nil {
return nil, gocql.ConvertError("GetHistoryTask", err)
}
return &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil

task := p.InternalHistoryTask{
Key: tasks.NewImmediateKey(taskID),
Blob: *p.NewDataBlob(data, encoding),
}
if request.TaskCategory.Type() == tasks.CategoryTypeScheduled {
task.Key.FireTime = time.Unix(0, ts)
}
return &p.InternalGetHistoryTaskResponse{
InternalHistoryTask: task,
}, nil
}

func (d *MutableStateTaskStore) getHistoryTasks(
ctx context.Context,
request *p.GetHistoryTasksRequest,
) (*p.InternalGetHistoryTasksResponse, error) {
switch request.TaskCategory.Type() {
case tasks.CategoryTypeImmediate:
return d.getHistoryImmedidateTasks(ctx, request)
case tasks.CategoryTypeScheduled:
return d.getHistoryScheduledTasks(ctx, request)
default:
panic(fmt.Sprintf("Unknown task category type: %v", request.TaskCategory.Type().String()))
}
}

func (d *MutableStateTaskStore) getHistoryImmedidateTasks(
ctx context.Context,
request *p.GetHistoryTasksRequest,
) (*p.InternalGetHistoryTasksResponse, error) {
// execution manager should already validated the request
// Reading history tasks need to be quorum level consistent, otherwise we could lose task

var query gocql.Query
if request.TaskCategory.Type() == tasks.CategoryTypeImmediate {
query = d.Session.Query(templateGetHistoryImmediateTasksQuery,
request.ShardID,
request.TaskCategory.ID(),
rowTypeHistoryTaskNamespaceID,
rowTypeHistoryTaskWorkflowID,
rowTypeHistoryTaskRunID,
defaultVisibilityTimestamp,
request.InclusiveMinTaskKey.TaskID,
request.ExclusiveMaxTaskKey.TaskID,
).WithContext(ctx)
} else {
minTimestamp := p.UnixMilliseconds(request.InclusiveMinTaskKey.FireTime)
maxTimestamp := p.UnixMilliseconds(request.ExclusiveMaxTaskKey.FireTime)
query = d.Session.Query(templateGetHistoryScheduledTasksQuery,
request.ShardID,
request.TaskCategory.ID(),
rowTypeHistoryTaskNamespaceID,
rowTypeHistoryTaskWorkflowID,
rowTypeHistoryTaskRunID,
minTimestamp,
maxTimestamp,
).WithContext(ctx)
query := d.Session.Query(templateGetHistoryImmediateTasksQuery,
request.ShardID,
request.TaskCategory.ID(),
rowTypeHistoryTaskNamespaceID,
rowTypeHistoryTaskWorkflowID,
rowTypeHistoryTaskRunID,
defaultVisibilityTimestamp,
request.InclusiveMinTaskKey.TaskID,
request.ExclusiveMaxTaskKey.TaskID,
).WithContext(ctx)

iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter()

response := &p.InternalGetHistoryTasksResponse{}
var taskID int64
var data []byte
var encoding string

for iter.Scan(&taskID, &data, &encoding) {
response.Tasks = append(response.Tasks, p.InternalHistoryTask{
Key: tasks.NewImmediateKey(taskID),
Blob: *p.NewDataBlob(data, encoding),
})

taskID = 0
data = nil
encoding = ""
}
if len(iter.PageState()) > 0 {
response.NextPageToken = iter.PageState()
}

if err := iter.Close(); err != nil {
return nil, gocql.ConvertError("GetHistoryImmediateTasks", err)
}

return response, nil
}

func (d *MutableStateTaskStore) getHistoryScheduledTasks(
ctx context.Context,
request *p.GetHistoryTasksRequest,
) (*p.InternalGetHistoryTasksResponse, error) {
// execution manager should already validated the request
// Reading history tasks need to be quorum level consistent, otherwise we could lose task

minTimestamp := p.UnixMilliseconds(request.InclusiveMinTaskKey.FireTime)
maxTimestamp := p.UnixMilliseconds(request.ExclusiveMaxTaskKey.FireTime)
query := d.Session.Query(templateGetHistoryScheduledTasksQuery,
request.ShardID,
request.TaskCategory.ID(),
rowTypeHistoryTaskNamespaceID,
rowTypeHistoryTaskWorkflowID,
rowTypeHistoryTaskRunID,
minTimestamp,
maxTimestamp,
).WithContext(ctx)

iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter()

response := &p.InternalGetHistoryTasksResponse{}
var timestamp time.Time
var taskID int64
var data []byte
var encoding string

for iter.Scan(&data, &encoding) {
response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding))
for iter.Scan(&timestamp, &taskID, &data, &encoding) {
response.Tasks = append(response.Tasks, p.InternalHistoryTask{
Key: tasks.NewKey(timestamp, taskID),
Blob: *p.NewDataBlob(data, encoding),
})

timestamp = time.Time{}
taskID = 0
data = nil
encoding = ""
}
Expand All @@ -942,7 +1048,7 @@ func (d *MutableStateTaskStore) getHistoryTasks(
}

if err := iter.Close(); err != nil {
return nil, gocql.ConvertError("GetHistoryTasks", err)
return nil, gocql.ConvertError("GetHistoryScheduledTasks", err)
}

return response, nil
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ const (

const numItemsInGarbageInfo = 3

const ScheduledTaskMinPrecision = time.Millisecond

type (
// InvalidPersistenceRequestError represents invalid request to persistence
InvalidPersistenceRequestError struct {
Expand Down
Loading

0 comments on commit ee1abe1

Please sign in to comment.