Skip to content

Commit

Permalink
Persist and load the start time of wait stage to deal with restart ca…
Browse files Browse the repository at this point in the history
…se (#348)

**What this PR does / why we need it**:

**Which issue(s) this PR fixes**:

Fixes #125

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
-->
```release-note
NONE
```

This PR was merged by Kapetanios.
  • Loading branch information
nghialv committed Jul 9, 2020
1 parent ad64556 commit 015c764
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/app/piped/executor/wait/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ go_library(
deps = [
"//pkg/app/piped/executor:go_default_library",
"//pkg/model:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
)
64 changes: 59 additions & 5 deletions pkg/app/piped/executor/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@
package wait

import (
"context"
"fmt"
"strconv"
"time"

"go.uber.org/zap"

"github.com/pipe-cd/pipe/pkg/app/piped/executor"
"github.com/pipe-cd/pipe/pkg/model"
)

var defaultDuration = time.Minute
const (
defaultDuration = time.Minute
logInterval = 10 * time.Second
startTimeKey = "startTime"
)

type Executor struct {
executor.Input
Expand All @@ -43,25 +51,46 @@ func Register(r registerer) {
}

// Execute starts waiting for the specified duration.
// TODO: Persist and load the start time to deal with restart case.
func (e *Executor) Execute(sig executor.StopSignal) model.StageStatus {
var (
originalStatus = e.Stage.Status
duration = defaultDuration
timer = time.NewTimer(duration)
)
defer timer.Stop()

// Apply the stage configurations.
if opts := e.StageConfig.WaitStageOptions; opts != nil {
if opts.Duration > 0 {
duration = opts.Duration.Duration()
}
}
totalDuration := duration

// Retrieve the saved startTime from the previous run.
startTime := e.retrieveStartTime()
if !startTime.IsZero() {
duration -= time.Since(startTime)
if duration < 0 {
duration = 0
}
} else {
startTime = time.Now()
}
defer e.saveStartTime(sig.Context(), startTime)

timer := time.NewTimer(duration)
defer timer.Stop()

ticker := time.NewTicker(logInterval)
defer ticker.Stop()

e.LogPersister.AppendInfo(fmt.Sprintf("Waiting for %v...", duration))
select {
case <-timer.C:
break

case <-ticker.C:
e.LogPersister.AppendInfo(fmt.Sprintf("%v elapsed...", time.Since(startTime)))

case s := <-sig.Ch():
switch s {
case executor.StopSignalCancel:
Expand All @@ -73,6 +102,31 @@ func (e *Executor) Execute(sig executor.StopSignal) model.StageStatus {
}
}

e.LogPersister.AppendInfo(fmt.Sprintf("Waited for %v", duration))
e.LogPersister.AppendInfo(fmt.Sprintf("Waited for %v", totalDuration))
return model.StageStatus_STAGE_SUCCESS
}

func (e *Executor) retrieveStartTime() (t time.Time) {
metadata, ok := e.MetadataStore.GetStageMetadata(e.Stage.Id)
if !ok {
return
}
s, ok := metadata[startTimeKey]
if !ok {
return
}
ut, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return
}
return time.Unix(ut, 0)
}

func (e *Executor) saveStartTime(ctx context.Context, t time.Time) {
metadata := map[string]string{
startTimeKey: strconv.FormatInt(t.Unix(), 10),
}
if err := e.MetadataStore.SetStageMetadata(ctx, e.Stage.Id, metadata); err != nil {
e.Logger.Error("failed to store metadata", zap.Error(err))
}
}

0 comments on commit 015c764

Please sign in to comment.