Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.3.x backport] PPS: Be more careful about k8s errors during RC updates #8264

Merged
merged 1 commit into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 41 additions & 1 deletion src/internal/promutil/promutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ package promutil
import (
"io"
"net/http"
"time"

"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)

var (
Expand All @@ -28,6 +31,43 @@ var (
}, []string{"client", "method"})
)

type loggingRT struct {
name string
underlying http.RoundTripper
}

func (rt *loggingRT) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()
log := logrus.WithFields(logrus.Fields{
"name": rt.name,
"method": req.Method,
"uri": req.URL.String(),
})

// Log the start of long HTTP requests.
timer := time.AfterFunc(10*time.Second, func() {
l := log
if dl, ok := req.Context().Deadline(); ok {
l = l.WithField("deadline", time.Until(dl))
}
l.WithField("duration", time.Since(start)).Info("ongoing long http request")
})
defer timer.Stop()

res, err := rt.underlying.RoundTrip(req)
if err != nil {
log.WithError(err).Info("outgoing http request completed with error")
return res, errors.EnsureStack(err)
}
if res != nil {
log.WithFields(logrus.Fields{
"duration": time.Since(start),
"status": res.Status,
}).Debugf("outgoing http request complete")
}
return res, errors.EnsureStack(err)
}

// InstrumentRoundTripper returns an http.RoundTripper that collects Prometheus metrics; delegating
// to the underlying RoundTripper to actually make requests.
func InstrumentRoundTripper(name string, rt http.RoundTripper) http.RoundTripper {
Expand All @@ -41,7 +81,7 @@ func InstrumentRoundTripper(name string, rt http.RoundTripper) http.RoundTripper
requestTimeMetric.MustCurryWith(ls),
promhttp.InstrumentRoundTripperCounter(
requestCountMetric.MustCurryWith(ls),
rt)))
&loggingRT{name: name, underlying: rt})))
}

// Adder is something that can be added to.
Expand Down
13 changes: 11 additions & 2 deletions src/server/pps/server/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,19 @@ type stepError struct {
}

func newRetriableError(err error, message string) error {
retry, failPipeline := true, true
if errors.Is(err, context.Canceled) {
retry = false
failPipeline = false
}
if errors.Is(err, context.DeadlineExceeded) {
retry = true
failPipeline = false
}
return stepError{
error: errors.Wrap(err, message),
retry: true,
failPipeline: true,
retry: retry,
failPipeline: failPipeline,
}
}

Expand Down
64 changes: 59 additions & 5 deletions src/server/pps/server/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -60,6 +61,26 @@ const (
sideEffectName_CRASH_MONITOR sideEffectName = 5
)

// String implements fmt.Stringer.
func (s sideEffectName) String() string {
switch s {
case sideEffectName_KUBERNETES_RESOURCES:
return "KUBERNETES_RESOURCES"
case sideEffectName_FINISH_COMMITS:
return "FINISH_COMMITS"
case sideEffectName_RESTART:
return "RESTART"
case sideEffectName_SCALE_WORKERS:
return "SCALE_WORKERS"
case sideEffectName_PIPELINE_MONITOR:
return "PIPELINE_MONITOR"
case sideEffectName_CRASH_MONITOR:
return "CRASH_MONITOR"
default:
return "UNKNOWN"
}
}

type sideEffectToggle int32

const (
Expand All @@ -68,6 +89,20 @@ const (
sideEffectToggle_DOWN sideEffectToggle = 2
)

// String implements fmt.Stringer.
func (s sideEffectToggle) String() string {
switch s {
case sideEffectToggle_NONE:
return "NONE"
case sideEffectToggle_UP:
return "UP"
case sideEffectToggle_DOWN:
return "DOWN"
default:
return "UNKNOWN"
}
}

// sideEffect intends to capture a state changing operation that the pipeline controller may apply
// NOTE: the PipelineInfo & ReplicationController arguments supplied to apply should be treated as read only copies
type sideEffect struct {
Expand All @@ -80,6 +115,18 @@ func (se sideEffect) equals(o sideEffect) bool {
return se.name == o.name && se.toggle == o.toggle
}

// String implements fmt.Stringer.
func (se sideEffect) String() string {
b := new(strings.Builder)
b.WriteString(se.name.String())
if t := se.toggle; t != sideEffectToggle_NONE {
b.WriteString(" (")
b.WriteString(t.String())
b.WriteString(")")
}
return b.String()
}

func ResourcesSideEffect(toggle sideEffectToggle) sideEffect {
return sideEffect{
name: sideEffectName_KUBERNETES_RESOURCES,
Expand Down Expand Up @@ -424,7 +471,7 @@ func evaluate(pi *pps.PipelineInfo, rc *v1.ReplicationController) (pps.PipelineS
func (pc *pipelineController) apply(ctx context.Context, pi *pps.PipelineInfo, rc *v1.ReplicationController, target pps.PipelineState, sideEffects []sideEffect, reason string) error {
for _, s := range sideEffects {
if err := s.apply(ctx, pc, pi, rc); err != nil {
return err
return errors.Wrapf(err, "apply side effect %s", s.String())
}
}
if target != pi.State {
Expand Down Expand Up @@ -579,12 +626,16 @@ func (pc *pipelineController) scaleUpPipeline(ctx context.Context, pi *pps.Pipel
return nil
})
// Set parallelism
log.Debugf("Beginning scale-up check for %q, which has %d tasks",
pi.Pipeline.Name, nTasks)
log.Debugf("PPS master: beginning scale-up check for %q, which has %d tasks and %d workers",
pi.Pipeline.Name, nTasks, curScale)
switch {
case err != nil || nTasks == 0:
log.Errorf("tasks remaining for %q not known (possibly still being calculated): %v",
pi.Pipeline.Name, err)
if err == nil {
log.Infof("PPS master: tasks remaining for %q not known (possibly still being calculated)",
pi.Pipeline.Name)
} else {
log.Errorf("PPS master: tasks remaining for %q not known (possibly still being calculated): %v", pi.Pipeline.Name, err)
}
return curScale // leave pipeline alone until until nTasks is available
case nTasks <= curScale:
return curScale // can't scale down w/o dropping work
Expand All @@ -611,9 +662,11 @@ func (pc *pipelineController) scaleUpPipeline(ctx context.Context, pi *pps.Pipel
}()
}
if curScale == targetScale {
log.Debugf("PPS master: pipeline %q is at desired scale", pi.GetPipeline().GetName())
return false // no changes necessary
}
// Update the # of replicas
log.Debugf("PPS master: scale pipeline %q from %d to %d replicas", pi.GetPipeline().GetName(), curScale, targetScale)
rc.Spec.Replicas = &targetScale
return true
}))
Expand All @@ -636,6 +689,7 @@ func (pc *pipelineController) scaleDownPipeline(ctx context.Context, pi *pps.Pip
if rc.Spec.Replicas != nil && *rc.Spec.Replicas == 0 {
return false // prior attempt succeeded
}
log.Debugf("PPS master: scale down pipline %q to 0 replicas", pi.GetPipeline().GetName())
rc.Spec.Replicas = &zero
return true
}))
Expand Down