Skip to content

Commit

Permalink
Add an ExecutableDLQ to write history tasks to the DLQ
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Sep 25, 2023
1 parent d6bf42b commit 1ca1bc4
Show file tree
Hide file tree
Showing 8 changed files with 602 additions and 0 deletions.
71 changes: 71 additions & 0 deletions common/clock/context.go
@@ -0,0 +1,71 @@
package clock

import (
"context"
"sync"
"time"
)

type ctxWithDeadline struct {
context.Context
deadline time.Time
timer Timer
once sync.Once
done chan struct{}
err error
}

func (ctx *ctxWithDeadline) Deadline() (deadline time.Time, ok bool) {
return ctx.deadline, true
}

func (ctx *ctxWithDeadline) Done() <-chan struct{} {
return ctx.done
}

func (ctx *ctxWithDeadline) Err() error {
select {
case <-ctx.done:
return ctx.err
default:
return nil
}
}

func (ctx *ctxWithDeadline) deadlineExceeded() {
ctx.once.Do(func() {
ctx.err = context.DeadlineExceeded
close(ctx.done)
})
}

func (ctx *ctxWithDeadline) cancel() {
ctx.once.Do(func() {
ctx.timer.Stop()
ctx.err = context.Canceled
close(ctx.done)
})
}

func ContextWithDeadline(
ctx context.Context,
deadline time.Time,
timeSource TimeSource,
) (context.Context, context.CancelFunc) {
ctxd := &ctxWithDeadline{
Context: ctx,
deadline: deadline,
done: make(chan struct{}),
}
timer := timeSource.AfterFunc(deadline.Sub(timeSource.Now()), ctxd.deadlineExceeded)
ctxd.timer = timer
return ctxd, ctxd.cancel
}

func ContextWithTimeout(
ctx context.Context,
timeout time.Duration,
timeSource TimeSource,
) (context.Context, context.CancelFunc) {
return ContextWithDeadline(ctx, timeSource.Now().Add(timeout), timeSource)
}
56 changes: 56 additions & 0 deletions common/clock/context_test.go
@@ -0,0 +1,56 @@
package clock_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.temporal.io/server/common/clock"
)

func TestContextWithTimeout_Canceled(t *testing.T) {
t.Parallel()

timeSource := clock.NewEventTimeSource()
timeSource.Update(time.Unix(0, 0))
ctx := context.Background()
ctx, cancel := clock.ContextWithTimeout(ctx, time.Second, timeSource)
deadline, ok := ctx.Deadline()
assert.True(t, ok)
assert.Equal(t, time.Unix(1, 0), deadline)
cancel()
select {
case <-ctx.Done():
assert.ErrorIs(t, ctx.Err(), context.Canceled)
default:
t.Fatal("expected context to be canceled")
}
}

func TestContextWithTimeout_Fire(t *testing.T) {
t.Parallel()

timeSource := clock.NewEventTimeSource()
timeSource.Update(time.Unix(0, 0))
ctx := context.Background()
ctx, cancel := clock.ContextWithTimeout(ctx, time.Second, timeSource)
deadline, ok := ctx.Deadline()
assert.True(t, ok)
assert.Equal(t, time.Unix(1, 0), deadline)
timeSource.Advance(time.Second - time.Millisecond)
select {
case <-ctx.Done():
t.Fatal("expected context to not be canceled")
default:
assert.NoError(t, ctx.Err())
}
timeSource.Advance(time.Millisecond)
select {
case <-ctx.Done():
assert.ErrorIs(t, ctx.Err(), context.DeadlineExceeded)
default:
t.Fatal("expected context to be canceled")
}
cancel() // should be a no-op
}
10 changes: 10 additions & 0 deletions common/persistence/tests/cassandra_test.go
Expand Up @@ -233,6 +233,12 @@ func TestCassandraQueuePersistence(t *testing.T) {
}

func TestCassandraQueueV2Persistence(t *testing.T) {
// This test function is split up into two parts:
// 1. Test the generic queue functionality, which is independent of the database choice (Cassandra here).
// This is done by calling the generic RunQueueV2TestSuite function.
// 2. Test the Cassandra-specific implementation of the queue. For example, things like queue message ID conflicts
// can only happen in Cassandra due to its lack of transactions, so we need to test those here.

t.Parallel()

cluster := persistencetests.NewTestClusterForCassandra(&persistencetests.TestBaseOptions{}, log.NewNoopLogger())
Expand All @@ -243,6 +249,10 @@ func TestCassandraQueueV2Persistence(t *testing.T) {
t.Parallel()
RunQueueV2TestSuite(t, cassandra.NewQueueV2Store(cluster.GetSession()))
})
runCassandraSpecificQueueV2Tests(t, cluster)
}

func runCassandraSpecificQueueV2Tests(t *testing.T, cluster *cassandra.TestCluster) {
t.Run("QueueMessageIDConflict", func(t *testing.T) {
t.Parallel()
testCassandraQueueV2ErrQueueMessageConflict(t, cluster)
Expand Down
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/api/getdlqtasks/getdlqtaskstest"
"go.temporal.io/server/service/history/queues/queuestest"
"go.temporal.io/server/service/history/tasks"
)

Expand All @@ -63,6 +64,10 @@ func RunHistoryTaskQueueManagerTestSuite(t *testing.T, queue persistence.QueueV2
t.Parallel()
historytest.TestClientGetDLQTasks(t, historyTaskQueueManager)
})
t.Run("ExecutableTest", func(t *testing.T) {
t.Parallel()
queuestest.TestExecutable(t, historyTaskQueueManager)
})
}

func testHistoryTaskQueueManagerHappyPath(t *testing.T, manager persistence.HistoryTaskQueueManager) {
Expand Down
114 changes: 114 additions & 0 deletions service/history/queues/executable_dlq.go
@@ -0,0 +1,114 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package queues

import (
"context"
"errors"
"fmt"
"time"

"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
)

type (
// ExecutableDLQ is an Executable decorator that will enqueue the task to a DLQ if the underlying Executable fails.
// The first call to Execute for which the underlying Executable returns a terminal error will return an
// ErrTerminalTaskFailure error and change the behavior of this task to just try to send itself to the DLQ.
// Specifically, all subsequent calls to Execute will attempt to send the task to the DLQ. If this fails, the error
// will be returned (with the expectation that clients will retry). If it succeeds, no error will be returned and
// the task can be acked. When the executable is in the failed state, all calls to HandleErr will just return the
// error passed in.
// TODO: wrap all executables with this
// TODO: add metrics and logging to this
ExecutableDLQ struct {
Executable
dlq DLQ
timeSource clock.TimeSource
// dlqCause is the original error which caused this task to be sent to the DLQ. It is only set once.
dlqCause error
clusterName string
}

// DLQ is a dead letter queue that can be used to enqueue tasks that fail to be processed.
DLQ interface {
EnqueueTask(
ctx context.Context,
request *persistence.EnqueueTaskRequest,
) (*persistence.EnqueueTaskResponse, error)
}
)

const (
sendToDLQTimeout = 5 * time.Second
)

var (
_ Executable = new(ExecutableDLQ)
ErrTerminalTaskFailure = errors.New("original task failed and this task is now to send the original to the DLQ")
ErrSendTaskToDLQ = errors.New("failed to send task to DLQ")
)

// NewExecutableDLQ wraps an Executable to ensure that it is sent to the DLQ if it fails terminally.
func NewExecutableDLQ(executable Executable, dlq DLQ, timeSource clock.TimeSource, clusterName string) *ExecutableDLQ {
return &ExecutableDLQ{
Executable: executable,
dlq: dlq,
timeSource: timeSource,
clusterName: clusterName,
}
}

// Execute is not thread-safe.
func (d *ExecutableDLQ) Execute() error {
if d.dlqCause == nil {
// This task has not experienced a terminal failure yet, so we should execute it.
err := d.Executable.Execute()
// TODO: expand on the errors that should be considered terminal
if !errors.As(err, new(*serialization.DeserializationError)) &&
!errors.As(err, new(*serialization.UnknownEncodingTypeError)) {
return err
}
d.dlqCause = err
return fmt.Errorf("%w: %v", ErrTerminalTaskFailure, err)
}
// This task experienced a terminal failure, so we should try to send it to the DLQ.
ctx := headers.SetCallerInfo(context.Background(), headers.SystemPreemptableCallerInfo)
ctx, cancel := clock.ContextWithTimeout(ctx, sendToDLQTimeout, d.timeSource)
defer cancel()
_, err := d.dlq.EnqueueTask(ctx, &persistence.EnqueueTaskRequest{
QueueType: persistence.QueueTypeHistoryDLQ,
SourceCluster: d.clusterName,
TargetCluster: d.clusterName,
Task: d.GetTask(),
})
if err != nil {
return fmt.Errorf("%w: %v", ErrSendTaskToDLQ, err)
}
return nil
}

0 comments on commit 1ca1bc4

Please sign in to comment.