Skip to content

Commit

Permalink
Workflow shadower (#1058)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Feb 25, 2021
1 parent 5282e89 commit 7829ee0
Show file tree
Hide file tree
Showing 8 changed files with 785 additions and 35 deletions.
20 changes: 0 additions & 20 deletions internal/internal_utils.go
Expand Up @@ -28,7 +28,6 @@ import (
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -267,25 +266,6 @@ func constructError(reason string, details []byte, dataConverter DataConverter)
}
}

// AwaitWaitGroup calls Wait on the given wait
// Returns true if the Wait() call succeeded before the timeout
// Returns false if the Wait() did not return before the timeout
func awaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool {
doneC := make(chan struct{})

go func() {
wg.Wait()
close(doneC)
}()

select {
case <-doneC:
return true
case <-time.After(timeout):
return false
}
}

func getKillSignal() <-chan os.Signal {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
Expand Down
3 changes: 2 additions & 1 deletion internal/internal_worker_base.go
Expand Up @@ -36,6 +36,7 @@ import (
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/internal/common/backoff"
"go.uber.org/cadence/internal/common/metrics"
"go.uber.org/cadence/internal/common/util"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -347,7 +348,7 @@ func (bw *baseWorker) Stop() {
close(bw.shutdownCh)
bw.limiterContextCancel()

if success := awaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
traceLog(func() {
bw.logger.Info("Worker graceful shutdown timed out.", zap.Duration("Shutdown timeout", bw.options.shutdownTimeout))
})
Expand Down
20 changes: 20 additions & 0 deletions internal/query_builder.go
@@ -1,3 +1,23 @@
// Copyright (c) 2017-2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
Expand Down
20 changes: 20 additions & 0 deletions internal/query_builder_test.go
@@ -1,3 +1,23 @@
// Copyright (c) 2017-2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
Expand Down
41 changes: 34 additions & 7 deletions internal/workflow_replayer.go
Expand Up @@ -36,10 +36,18 @@ import (
"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/internal/common"
"go.uber.org/cadence/internal/common/backoff"
"go.uber.org/cadence/internal/common/serializer"
"go.uber.org/zap"
)

var (
errReplayEmptyHistory = errors.New("empty events")
errReplayHistoryTooShort = errors.New("at least 3 events expected in the history")
errReplayInvalidFirstEvent = errors.New("first event is not WorkflowExecutionStarted")
errReplayCorruptedStartedEvent = errors.New("corrupted WorkflowExecutionStarted")
)

// WorkflowReplayer is used to replay workflow code from an event history
type WorkflowReplayer struct {
registry *registry
Expand Down Expand Up @@ -104,7 +112,7 @@ func (r *WorkflowReplayer) ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.
return r.replayWorkflowHistory(logger, service, ReplayDomainName, history)
}

// ReplayWorkflowExecution replays workflow execution loading it from Temporal service.
// ReplayWorkflowExecution replays workflow execution loading it from Cadence service.
func (r *WorkflowReplayer) ReplayWorkflowExecution(
ctx context.Context,
service workflowserviceclient.Interface,
Expand All @@ -120,8 +128,27 @@ func (r *WorkflowReplayer) ReplayWorkflowExecution(
Domain: common.StringPtr(domain),
Execution: sharedExecution,
}
hResponse, err := service.GetWorkflowExecutionHistory(ctx, request)
if err != nil {

var hResponse *shared.GetWorkflowExecutionHistoryResponse
if err := backoff.Retry(ctx,
func() error {
tchCtx, cancel, opt := newChannelContext(ctx)

var err error
hResponse, err = service.GetWorkflowExecutionHistory(tchCtx, request, opt...)
cancel()

return err
},
createDynamicServiceRetryPolicy(ctx),
func(err error) bool {
if _, ok := err.(*shared.InternalServiceError); ok {
// treat InternalServiceError as non-retryable, as the workflow history may be corrupted
return false
}
return isServiceTransientError(err)
},
); err != nil {
return err
}

Expand All @@ -146,20 +173,20 @@ func (r *WorkflowReplayer) replayWorkflowHistory(
taskList := "ReplayTaskList"
events := history.Events
if events == nil {
return errors.New("empty events")
return errReplayEmptyHistory
}
if len(events) < 3 {
return errors.New("at least 3 events expected in the history")
return errReplayHistoryTooShort
}
first := events[0]
if first.GetEventType() != shared.EventTypeWorkflowExecutionStarted {
return errors.New("first event is not WorkflowExecutionStarted")
return errReplayInvalidFirstEvent
}
last := events[len(events)-1]

attr := first.WorkflowExecutionStartedEventAttributes
if attr == nil {
return errors.New("corrupted WorkflowExecutionStarted")
return errReplayCorruptedStartedEvent
}
workflowType := attr.WorkflowType
execution := &shared.WorkflowExecution{
Expand Down

0 comments on commit 7829ee0

Please sign in to comment.