Skip to content

Commit

Permalink
Remove TTL from open_executions Cassandra visibility table (#1456)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Apr 13, 2021
1 parent b75a7cf commit bd7df35
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 106 deletions.
89 changes: 37 additions & 52 deletions common/persistence/cassandra/cassandraVisibilityPersistence.go
Expand Up @@ -38,19 +38,13 @@ import (

// Fixed namespace values for now
const (
namespacePartition = 0
defaultCloseTTLSeconds = 86400
openExecutionTTLBuffer = int64(86400) // setting it to a day to account for shard going down
namespacePartition = 0

// ref: https://docs.datastax.com/en/dse-trblshoot/doc/troubleshooting/recoveringTtlYear2038Problem.html
maxCassandraTTL = int64(315360000) // Cassandra max support time is 2038-01-19T03:14:06+00:00. Updated this to 10 years to support until year 2028
)

const (
templateCreateWorkflowExecutionStartedWithTTL = `INSERT INTO open_executions (` +
`namespace_id, namespace_partition, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, task_queue) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions (` +
`namespace_id, namespace_partition, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, task_queue) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
Expand Down Expand Up @@ -159,37 +153,22 @@ func (v *cassandraVisibilityPersistence) Close() {

func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
request *p.InternalRecordWorkflowExecutionStartedRequest) error {
ttl := request.RunTimeout + openExecutionTTLBuffer
var query gocql.Query

if ttl > maxCassandraTTL {
query = v.session.Query(templateCreateWorkflowExecutionStarted,
request.NamespaceID,
namespacePartition,
request.WorkflowID,
request.RunID,
p.UnixNanoToDBTimestamp(request.StartTimestamp),
p.UnixNanoToDBTimestamp(request.ExecutionTimestamp),
request.WorkflowTypeName,
request.Memo.Data,
request.Memo.EncodingType.String(),
request.TaskQueue,
)
} else {
query = v.session.Query(templateCreateWorkflowExecutionStartedWithTTL,
request.NamespaceID,
namespacePartition,
request.WorkflowID,
request.RunID,
p.UnixNanoToDBTimestamp(request.StartTimestamp),
p.UnixNanoToDBTimestamp(request.ExecutionTimestamp),
request.WorkflowTypeName,
request.Memo.Data,
request.Memo.EncodingType.String(),
request.TaskQueue,
ttl,
)
}
query := v.session.Query(templateCreateWorkflowExecutionStarted,
request.NamespaceID,
namespacePartition,
request.WorkflowID,
request.RunID,
p.UnixNanoToDBTimestamp(request.StartTimestamp),
p.UnixNanoToDBTimestamp(request.ExecutionTimestamp),
request.WorkflowTypeName,
request.Memo.Data,
request.Memo.EncodingType.String(),
request.TaskQueue,
)
// It is important to specify timestamp for all `open_executions` queries because
// we are using milliseconds instead of default microseconds. If custom timestamp collides with
// default timestamp, default one will always win because they are 1000 times bigger.
query = query.WithTimestamp(p.UnixNanoToDBTimestamp(request.StartTimestamp))
err := query.Exec()
return gocql.ConvertError("RecordWorkflowExecutionStarted", err)
Expand All @@ -215,12 +194,14 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosedV2(
// Next, add a row in the closed table.

// Find how long to keep the row
retention := request.RetentionSeconds
if retention == 0 {
retention = defaultCloseTTLSeconds
var retentionSeconds int64
if request.Retention != nil {
retentionSeconds = int64(request.Retention.Seconds())
} else {
retentionSeconds = maxCassandraTTL + 1
}

if retention > maxCassandraTTL {
if retentionSeconds > maxCassandraTTL {
batch.Query(templateCreateWorkflowExecutionClosed,
request.NamespaceID,
namespacePartition,
Expand Down Expand Up @@ -251,21 +232,25 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosedV2(
request.Memo.Data,
request.Memo.EncodingType.String(),
request.TaskQueue,
retention,
retentionSeconds,
)
}

// RecordWorkflowExecutionStarted is using StartTimestamp as
// the timestamp to issue query to Cassandra
// due to the fact that cross DC using mutable state creation time as workflow start time
// and visibility using event time instead of last update time (#1501)
// CloseTimestamp can be before StartTimestamp, meaning using CloseTimestamp
// can cause the deletion of open visibility record to be ignored.
queryTimeStamp := request.CloseTimestamp
if queryTimeStamp < request.StartTimestamp {
queryTimeStamp = request.StartTimestamp + time.Second.Nanoseconds()
// RecordWorkflowExecutionStarted is using StartTimestamp as the timestamp for every query in `open_executions` table.
// Due to the fact that cross DC using mutable state creation time as workflow start time and visibility using event time
// instead of last update time (https://github.com/uber/cadence/pull/1501) CloseTimestamp can be before StartTimestamp (or very close it).
// In this case, use (StartTimestamp + minWorkflowDuration) for delete operation to guarantee that it is greater than StartTimestamp
// and won't be ignored.

const minWorkflowDuration = time.Second
var batchTimestamp int64
if request.CloseTimestamp-request.StartTimestamp < minWorkflowDuration.Nanoseconds() {
batchTimestamp = request.StartTimestamp + minWorkflowDuration.Nanoseconds()
} else {
batchTimestamp = request.CloseTimestamp
}
batch = batch.WithTimestamp(p.UnixNanoToDBTimestamp(queryTimeStamp))

batch = batch.WithTimestamp(p.UnixNanoToDBTimestamp(batchTimestamp))
err := v.session.ExecuteBatch(batch)
return gocql.ConvertError("RecordWorkflowExecutionClosed", err)
}
Expand Down
136 changes: 125 additions & 11 deletions common/persistence/persistence-tests/visibilityPersistenceTest.go
Expand Up @@ -33,6 +33,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/server/common/persistence/cassandra"

"go.temporal.io/server/common/payload"
p "go.temporal.io/server/common/persistence"
Expand Down Expand Up @@ -138,7 +139,7 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibilityTimeSkew() {
RunId: "fb15e4b5-356f-466d-8c6d-a29223e5c536",
}

startTime := time.Now().UTC().Add(time.Second * -5).UnixNano()
startTime := time.Now().UTC().UnixNano()
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&p.RecordWorkflowExecutionStartedRequest{
VisibilityRequestBase: &p.VisibilityRequestBase{
NamespaceID: testNamespaceUUID,
Expand All @@ -147,15 +148,15 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibilityTimeSkew() {
StartTimestamp: startTime,
},
})
s.Nil(err0)
s.NoError(err0)

resp, err1 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
PageSize: 1,
EarliestStartTime: startTime,
LatestStartTime: startTime,
})
s.Nil(err1)
s.NoError(err1)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution.WorkflowId, resp.Executions[0].Execution.WorkflowId)

Expand All @@ -166,29 +167,144 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibilityTimeSkew() {
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
},
CloseTimestamp: startTime - (10 * time.Second).Nanoseconds(),
CloseTimestamp: startTime - (10 * time.Millisecond).Nanoseconds(),
})
s.Nil(err2)
s.NoError(err2)

resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
PageSize: 1,
EarliestStartTime: startTime,
LatestStartTime: startTime,
})
s.Nil(err3)
s.NoError(err3)
s.Equal(0, len(resp.Executions))

resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
PageSize: 1,
EarliestStartTime: startTime - (15 * time.Second).Nanoseconds(),
LatestStartTime: time.Now().UnixNano(),
EarliestStartTime: startTime - (10 * time.Millisecond).Nanoseconds(), // This is actually close_time
LatestStartTime: startTime - (10 * time.Millisecond).Nanoseconds(),
})
s.Nil(err4)
s.NoError(err4)
s.Equal(1, len(resp.Executions))
}

func (s *VisibilityPersistenceSuite) TestBasicVisibilityShortWorkflow() {
testNamespaceUUID := uuid.New()

workflowExecution := commonpb.WorkflowExecution{
WorkflowId: "visibility-workflow-test-short-workflow",
RunId: "3c095198-0c33-4136-939a-c29fbbb6a80b",
}

startTime := time.Now().UTC().UnixNano()
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&p.RecordWorkflowExecutionStartedRequest{
VisibilityRequestBase: &p.VisibilityRequestBase{
NamespaceID: testNamespaceUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
},
})
s.NoError(err0)

err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&p.RecordWorkflowExecutionClosedRequest{
VisibilityRequestBase: &p.VisibilityRequestBase{
NamespaceID: testNamespaceUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
},
CloseTimestamp: startTime + (10 * time.Millisecond).Nanoseconds(),
})
s.NoError(err2)

resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
PageSize: 1,
EarliestStartTime: startTime,
LatestStartTime: startTime,
})
s.NoError(err3)
s.Equal(0, len(resp.Executions))

resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
PageSize: 1,
EarliestStartTime: startTime + (10 * time.Millisecond).Nanoseconds(), // This is actually close_time
LatestStartTime: startTime + (10 * time.Millisecond).Nanoseconds(),
})
s.NoError(err4)
s.Equal(1, len(resp.Executions))
}

func (s *VisibilityPersistenceSuite) TestVisibilityRetention() {
if _, ok := s.VisibilityTestCluster.(*cassandra.TestCluster); !ok {
return
}

testNamespaceUUID := uuid.New()

workflowExecution := commonpb.WorkflowExecution{
WorkflowId: "visibility-workflow-test-visibility-retention",
RunId: "3c095198-0c33-4136-939a-c29fbbb6a802",
}

startTime := time.Now().UTC().Add(-1 * time.Hour).UnixNano()
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&p.RecordWorkflowExecutionStartedRequest{
VisibilityRequestBase: &p.VisibilityRequestBase{
NamespaceID: testNamespaceUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
},
})
s.NoError(err0)

retention := 1 * time.Second
err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&p.RecordWorkflowExecutionClosedRequest{
VisibilityRequestBase: &p.VisibilityRequestBase{
NamespaceID: testNamespaceUUID,
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
},
CloseTimestamp: startTime + (1 * time.Minute).Nanoseconds(),
Retention: &retention,
})
s.NoError(err2)

resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
PageSize: 1,
EarliestStartTime: startTime,
LatestStartTime: startTime,
})
s.NoError(err3)
s.Equal(0, len(resp.Executions))

resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
PageSize: 1,
EarliestStartTime: startTime + (1 * time.Minute).Nanoseconds(), // This is actually close_time
LatestStartTime: startTime + (1 * time.Minute).Nanoseconds(),
})
s.NoError(err4)
s.Equal(1, len(resp.Executions))

// Sleep for retention to fire.
time.Sleep(retention)
resp2, err5 := s.VisibilityMgr.ListClosedWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
PageSize: 1,
EarliestStartTime: startTime + (1 * time.Minute).Nanoseconds(), // This is actually close_time
LatestStartTime: startTime + (1 * time.Minute).Nanoseconds(),
})
s.NoError(err5)
s.Equal(0, len(resp2.Executions))
}

// TestVisibilityPagination test
func (s *VisibilityPersistenceSuite) TestVisibilityPagination() {
testNamespaceUUID := uuid.New()
Expand Down Expand Up @@ -746,7 +862,6 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
},
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
},
WorkflowTimeout: 0,
},
expected: nil,
},
Expand All @@ -764,7 +879,6 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
SearchAttributes: nil,
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
},
WorkflowTimeout: 0,
},
// To avoid blocking the task queue processors on non-ElasticSearch visibility stores
// we simply treat any attempts to perform Upserts as "no-ops"
Expand Down
9 changes: 3 additions & 6 deletions common/persistence/persistenceInterface.go
Expand Up @@ -479,22 +479,19 @@ type (
// InternalRecordWorkflowExecutionStartedRequest request to RecordWorkflowExecutionStarted
InternalRecordWorkflowExecutionStartedRequest struct {
*InternalVisibilityRequestBase
RunTimeout int64
}

// InternalRecordWorkflowExecutionClosedRequest is request to RecordWorkflowExecutionClosed
InternalRecordWorkflowExecutionClosedRequest struct {
*InternalVisibilityRequestBase
CloseTimestamp int64
HistoryLength int64
RetentionSeconds int64
CloseTimestamp int64
HistoryLength int64
Retention *time.Duration
}

// InternalUpsertWorkflowExecutionRequest is request to UpsertWorkflowExecution
InternalUpsertWorkflowExecutionRequest struct {
*InternalVisibilityRequestBase
// TODO (alex): not used, remove
WorkflowTimeout int64
}

// InternalCreateNamespaceRequest is used to create the namespace
Expand Down
10 changes: 5 additions & 5 deletions common/persistence/visibilityInterfaces.go
Expand Up @@ -28,6 +28,8 @@
package persistence

import (
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -58,21 +60,19 @@ type (
// RecordWorkflowExecutionStartedRequest is used to add a record of a newly started execution
RecordWorkflowExecutionStartedRequest struct {
*VisibilityRequestBase
RunTimeout int64 // not persisted, used for cassandra ttl
}

// RecordWorkflowExecutionClosedRequest is used to add a record of a closed execution
RecordWorkflowExecutionClosedRequest struct {
*VisibilityRequestBase
CloseTimestamp int64
HistoryLength int64
RetentionSeconds int64 // not persisted, used for cassandra ttl
CloseTimestamp int64
HistoryLength int64
Retention *time.Duration // not persisted, used for cassandra ttl
}

// UpsertWorkflowExecutionRequest is used to upsert workflow execution
UpsertWorkflowExecutionRequest struct {
*VisibilityRequestBase
WorkflowTimeout int64 // not persisted, used for cassandra ttl
}

// ListWorkflowExecutionsRequest is used to list executions in a namespace
Expand Down

0 comments on commit bd7df35

Please sign in to comment.