Skip to content

Commit

Permalink
sql: support for getTimerIndexTasks pagination (#1224)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 committed Nov 13, 2018
1 parent e8dbfab commit e69b36d
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 10 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ THRIFTRW_SRCS = \
idl/github.com/uber/cadence/admin.thrift \

PROGS = cadence
TEST_ARG ?= -race -v -timeout 15m
TEST_ARG ?= -race -v -timeout 20m
BUILD := ./build
TOOLS_CMD_ROOT=./cmd/tools
INTEG_TEST_ROOT=./host
Expand Down Expand Up @@ -97,15 +97,15 @@ test: dep-ensured bins
@rm -f test
@rm -f test.log
@for dir in $(TEST_DIRS); do \
go test -timeout 15m -race -coverprofile=$@ "$$dir" | tee -a test.log; \
go test -timeout 20m -race -coverprofile=$@ "$$dir" | tee -a test.log; \
done;

# need to run xdc tests with race detector off because of ringpop bug causing data race issue
test_xdc: dep-ensured bins
@rm -f test
@rm -f test.log
@for dir in $(INTEG_TEST_XDC_ROOT); do \
go test -timeout 15m -coverprofile=$@ "$$dir" | tee -a test.log; \
go test -timeout 20m -coverprofile=$@ "$$dir" | tee -a test.log; \
done;

cover_profile: clean bins_nothrift
Expand Down
54 changes: 49 additions & 5 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
package sql

import (
"database/sql"
"fmt"
"math"
"time"

"database/sql"
"encoding/json"

"github.com/jmoiron/sqlx"
"github.com/uber-common/bark"
workflow "github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -476,8 +479,9 @@ LIMIT ?`
`
FROM timer_tasks WHERE
shard_id = ? AND
visibility_timestamp >= ? AND
visibility_timestamp < ?`
((visibility_timestamp >= ? AND task_id >= ?) OR visibility_timestamp > ?) AND
visibility_timestamp < ?
ORDER BY visibility_timestamp,task_id LIMIT ?`
completeTimerTaskSQLQuery = `DELETE FROM timer_tasks WHERE shard_id = ? AND visibility_timestamp = ? AND task_id = ?`
rangeCompleteTimerTaskSQLQuery = `DELETE FROM timer_tasks WHERE shard_id = ? AND visibility_timestamp >= ? AND visibility_timestamp < ?`
lockAndCheckNextEventIDSQLQuery = `SELECT next_event_id FROM executions WHERE
Expand Down Expand Up @@ -1412,18 +1416,58 @@ func (m *sqlExecutionManager) CompleteReplicationTask(request *p.CompleteReplica
return nil
}

type timerTaskPageToken struct {
TaskID int64
Timestamp time.Time
}

func (t *timerTaskPageToken) serialize() ([]byte, error) {
return json.Marshal(t)
}

func (t *timerTaskPageToken) deserialize(payload []byte) error {
return json.Unmarshal(payload, t)
}

func (m *sqlExecutionManager) GetTimerIndexTasks(request *p.GetTimerIndexTasksRequest) (*p.GetTimerIndexTasksResponse, error) {
pageToken := &timerTaskPageToken{TaskID: math.MinInt64, Timestamp: request.MinTimestamp}
if len(request.NextPageToken) > 0 {
if err := pageToken.deserialize(request.NextPageToken); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("error deserializing timerTaskPageToken: %v", err),
}
}
}

var resp p.GetTimerIndexTasksResponse

if err := m.db.Select(&resp.Timers, getTimerTasksSQLQuery,
m.shardID,
request.MinTimestamp,
request.MaxTimestamp); err != nil && err != sql.ErrNoRows {
pageToken.Timestamp,
pageToken.TaskID,
pageToken.Timestamp,
request.MaxTimestamp,
request.BatchSize+1); err != nil && err != sql.ErrNoRows {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetTimerTasks operation failed. Select failed. Error: %v", err),
}
}

if len(resp.Timers) > request.BatchSize {
pageToken = &timerTaskPageToken{
TaskID: resp.Timers[request.BatchSize].TaskID,
Timestamp: resp.Timers[request.BatchSize].VisibilityTimestamp,
}
resp.Timers = resp.Timers[:request.BatchSize]
nextToken, err := pageToken.serialize()
if err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetTimerTasks: error serializing page token: %v", err),
}
}
resp.NextPageToken = nextToken
}

return &resp, nil
}

Expand Down
2 changes: 1 addition & 1 deletion schema/mysql/v56/cadence/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ CREATE TABLE events (
batch_version BIGINT,
range_id INT NOT NULL,
tx_id INT NOT NULL,
data BLOB NOT NULL,
data MEDIUMBLOB NOT NULL,
data_encoding VARCHAR(64) NOT NULL,
PRIMARY KEY (domain_id, workflow_id, run_id, first_event_id)
);
Expand Down
2 changes: 1 addition & 1 deletion schema/mysql/v57/cadence/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ CREATE TABLE buffered_events (
workflow_id VARCHAR(255) NOT NULL,
run_id CHAR(64) NOT NULL,
--
data BLOB NOT NULL,
data MEDIUMBLOB NOT NULL,
data_encoding VARCHAR(64) NOT NULL,
PRIMARY KEY (id)
);
Expand Down

0 comments on commit e69b36d

Please sign in to comment.