Skip to content

Commit

Permalink
PPS: Be more careful about k8s errors during RC updates (#8244)
Browse files Browse the repository at this point in the history
* promutil: also log outgoing http requests

* PPS master: don't fail the pipeline because of context canceled / deadline exceeded errors

* pps: side effects: add stringification of side effects

* pps: pipelineController: return description of failing side effect

* pps: pipeline controller: log more about scaling; prefix all messages with "PPS master"

* promutil: make http logs less verbose (PPS spams the heck out of k8s while idle; we have metrics to watch those)

* pps: pipeline controller: add some more information in debug logs during scale up

* promutil: fix lint
  • Loading branch information
jrockway committed Oct 10, 2022
1 parent 56095fa commit fb394b1
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 8 deletions.
42 changes: 41 additions & 1 deletion src/internal/promutil/promutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ package promutil
import (
"io"
"net/http"
"time"

"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)

var (
Expand All @@ -28,6 +31,43 @@ var (
}, []string{"client", "method"})
)

type loggingRT struct {
name string
underlying http.RoundTripper
}

func (rt *loggingRT) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()
log := logrus.WithFields(logrus.Fields{
"name": rt.name,
"method": req.Method,
"uri": req.URL.String(),
})

// Log the start of long HTTP requests.
timer := time.AfterFunc(10*time.Second, func() {
l := log
if dl, ok := req.Context().Deadline(); ok {
l = l.WithField("deadline", time.Until(dl))
}
l.WithField("duration", time.Since(start)).Info("ongoing long http request")
})
defer timer.Stop()

res, err := rt.underlying.RoundTrip(req)
if err != nil {
log.WithError(err).Info("outgoing http request completed with error")
return res, errors.EnsureStack(err)
}
if res != nil {
log.WithFields(logrus.Fields{
"duration": time.Since(start),
"status": res.Status,
}).Debugf("outgoing http request complete")
}
return res, errors.EnsureStack(err)
}

// InstrumentRoundTripper returns an http.RoundTripper that collects Prometheus metrics; delegating
// to the underlying RoundTripper to actually make requests.
func InstrumentRoundTripper(name string, rt http.RoundTripper) http.RoundTripper {
Expand All @@ -41,7 +81,7 @@ func InstrumentRoundTripper(name string, rt http.RoundTripper) http.RoundTripper
requestTimeMetric.MustCurryWith(ls),
promhttp.InstrumentRoundTripperCounter(
requestCountMetric.MustCurryWith(ls),
rt)))
&loggingRT{name: name, underlying: rt})))
}

// Adder is something that can be added to.
Expand Down
13 changes: 11 additions & 2 deletions src/server/pps/server/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,19 @@ type stepError struct {
}

func newRetriableError(err error, message string) error {
retry, failPipeline := true, true
if errors.Is(err, context.Canceled) {
retry = false
failPipeline = false
}
if errors.Is(err, context.DeadlineExceeded) {
retry = true
failPipeline = false
}
return stepError{
error: errors.Wrap(err, message),
retry: true,
failPipeline: true,
retry: retry,
failPipeline: failPipeline,
}
}

Expand Down
64 changes: 59 additions & 5 deletions src/server/pps/server/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -60,6 +61,26 @@ const (
sideEffectName_CRASH_MONITOR sideEffectName = 5
)

// String implements fmt.Stringer.
func (s sideEffectName) String() string {
switch s {
case sideEffectName_KUBERNETES_RESOURCES:
return "KUBERNETES_RESOURCES"
case sideEffectName_FINISH_COMMITS:
return "FINISH_COMMITS"
case sideEffectName_RESTART:
return "RESTART"
case sideEffectName_SCALE_WORKERS:
return "SCALE_WORKERS"
case sideEffectName_PIPELINE_MONITOR:
return "PIPELINE_MONITOR"
case sideEffectName_CRASH_MONITOR:
return "CRASH_MONITOR"
default:
return "UNKNOWN"
}
}

type sideEffectToggle int32

const (
Expand All @@ -68,6 +89,20 @@ const (
sideEffectToggle_DOWN sideEffectToggle = 2
)

// String implements fmt.Stringer.
func (s sideEffectToggle) String() string {
switch s {
case sideEffectToggle_NONE:
return "NONE"
case sideEffectToggle_UP:
return "UP"
case sideEffectToggle_DOWN:
return "DOWN"
default:
return "UNKNOWN"
}
}

// sideEffect intends to capture a state changing operation that the pipeline controller may apply
// NOTE: the PipelineInfo & ReplicationController arguments supplied to apply should be treated as read only copies
type sideEffect struct {
Expand All @@ -80,6 +115,18 @@ func (se sideEffect) equals(o sideEffect) bool {
return se.name == o.name && se.toggle == o.toggle
}

// String implements fmt.Stringer.
func (se sideEffect) String() string {
b := new(strings.Builder)
b.WriteString(se.name.String())
if t := se.toggle; t != sideEffectToggle_NONE {
b.WriteString(" (")
b.WriteString(t.String())
b.WriteString(")")
}
return b.String()
}

func ResourcesSideEffect(toggle sideEffectToggle) sideEffect {
return sideEffect{
name: sideEffectName_KUBERNETES_RESOURCES,
Expand Down Expand Up @@ -424,7 +471,7 @@ func evaluate(pi *pps.PipelineInfo, rc *v1.ReplicationController) (pps.PipelineS
func (pc *pipelineController) apply(ctx context.Context, pi *pps.PipelineInfo, rc *v1.ReplicationController, target pps.PipelineState, sideEffects []sideEffect, reason string) error {
for _, s := range sideEffects {
if err := s.apply(ctx, pc, pi, rc); err != nil {
return err
return errors.Wrapf(err, "apply side effect %s", s.String())
}
}
if target != pi.State {
Expand Down Expand Up @@ -579,12 +626,16 @@ func (pc *pipelineController) scaleUpPipeline(ctx context.Context, pi *pps.Pipel
return nil
})
// Set parallelism
log.Debugf("Beginning scale-up check for %q, which has %d tasks",
pi.Pipeline.Name, nTasks)
log.Debugf("PPS master: beginning scale-up check for %q, which has %d tasks and %d workers",
pi.Pipeline.Name, nTasks, curScale)
switch {
case err != nil || nTasks == 0:
log.Errorf("tasks remaining for %q not known (possibly still being calculated): %v",
pi.Pipeline.Name, err)
if err == nil {
log.Infof("PPS master: tasks remaining for %q not known (possibly still being calculated)",
pi.Pipeline.Name)
} else {
log.Errorf("PPS master: tasks remaining for %q not known (possibly still being calculated): %v", pi.Pipeline.Name, err)
}
return curScale // leave pipeline alone until until nTasks is available
case nTasks <= curScale:
return curScale // can't scale down w/o dropping work
Expand All @@ -611,9 +662,11 @@ func (pc *pipelineController) scaleUpPipeline(ctx context.Context, pi *pps.Pipel
}()
}
if curScale == targetScale {
log.Debugf("PPS master: pipeline %q is at desired scale", pi.GetPipeline().GetName())
return false // no changes necessary
}
// Update the # of replicas
log.Debugf("PPS master: scale pipeline %q from %d to %d replicas", pi.GetPipeline().GetName(), curScale, targetScale)
rc.Spec.Replicas = &targetScale
return true
}))
Expand All @@ -636,6 +689,7 @@ func (pc *pipelineController) scaleDownPipeline(ctx context.Context, pi *pps.Pip
if rc.Spec.Replicas != nil && *rc.Spec.Replicas == 0 {
return false // prior attempt succeeded
}
log.Debugf("PPS master: scale down pipline %q to 0 replicas", pi.GetPipeline().GetName())
rc.Spec.Replicas = &zero
return true
}))
Expand Down

0 comments on commit fb394b1

Please sign in to comment.