Skip to content

Commit

Permalink
fix: spurious cancelation of async webhooks, better tracing
Browse files Browse the repository at this point in the history
Previously, async webhooks (response.ignore=true) would be canceled
early once the incoming Kratos request was served and it's associated
context released. We now dissociate the cancellation of async hooks
from the normal request processing flow.
  • Loading branch information
alnr committed Dec 19, 2022
1 parent e11ba52 commit 852ab26
Showing 1 changed file with 40 additions and 23 deletions.
63 changes: 40 additions & 23 deletions selfservice/hook/web_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/pkg/errors"
"github.com/tidwall/gjson"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.11.0"
"go.opentelemetry.io/otel/trace"

"github.com/ory/kratos/ui/node"
Expand All @@ -29,7 +32,6 @@ import (
"github.com/ory/kratos/session"
"github.com/ory/kratos/text"
"github.com/ory/kratos/x"
"github.com/ory/x/otelx"
)

var (
Expand Down Expand Up @@ -253,22 +255,6 @@ func (e *WebHook) ExecuteSettingsPrePersistHook(_ http.ResponseWriter, req *http
}

func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
span := trace.SpanFromContext(ctx)
attrs := map[string]string{
"webhook.http.method": data.RequestMethod,
"webhook.http.url": data.RequestURL,
"webhook.http.headers": fmt.Sprintf("%#v", data.RequestHeaders),
}

if data.Identity != nil {
attrs["webhook.identity.id"] = data.Identity.ID.String()
} else {
attrs["webhook.identity.id"] = ""
}

span.SetAttributes(otelx.StringAttrs(attrs)...)
defer span.End()

builder, err := request.NewBuilder(e.conf, e.deps)
if err != nil {
return err
Expand All @@ -281,35 +267,66 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
return err
}

errChan := make(chan error, 1)
e.deps.Logger().WithRequest(req.Request).Info("Dispatching webhook")

attrs := semconv.HTTPClientAttributesFromHTTPRequest(req.Request)
if data.Identity != nil {
attrs = append(attrs,
attribute.String("webhook.identity.id", data.Identity.ID.String()),
attribute.String("webhook.identity.nid", data.Identity.NID.String()),
)
}
var (
httpClient = e.deps.HTTPClient(ctx)
async = gjson.GetBytes(e.conf, "response.ignore").Bool()
parseResponse = gjson.GetBytes(e.conf, "can_interrupt").Bool()
tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks")
cancel context.CancelFunc = func() {}
spanOpts = []trace.SpanStartOption{trace.WithAttributes(attrs...)}
errChan = make(chan error, 1)
)
if async {
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
spanOpts = append(spanOpts, trace.WithNewRoot())
}
ctx, span := tracer.Start(ctx, "Webhook", spanOpts...)
go func() {
defer close(errChan)
defer cancel()
defer span.End()

resp, err := e.deps.HTTPClient(ctx).Do(req.WithContext(ctx))
resp, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
span.SetStatus(codes.Error, err.Error())
errChan <- errors.WithStack(err)
return
}
defer resp.Body.Close()
span.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(resp.StatusCode)...)

if resp.StatusCode >= http.StatusBadRequest {
if gjson.GetBytes(e.conf, "can_interrupt").Bool() {
span.SetStatus(codes.Error, "HTTP status code >= 400")
if parseResponse {
if err := parseWebhookResponse(resp); err != nil {
span.SetStatus(codes.Error, err.Error())
errChan <- err
}
}
errChan <- fmt.Errorf("web hook failed with status code %v", resp.StatusCode)
span.SetStatus(codes.Error, fmt.Sprintf("web hook failed with status code %v", resp.StatusCode))
return
}

errChan <- nil
}()

if gjson.GetBytes(e.conf, "response.ignore").Bool() {
if async {
traceID, spanID := span.SpanContext().TraceID(), span.SpanContext().SpanID()
go func() {
err := <-errChan
e.deps.Logger().WithError(err).Warning("A web hook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.")
e.deps.Logger().WithField("otel", map[string]string{
"trace_id": traceID.String(),
"span_id": spanID.String(),
}).WithError(err).Warning("A web hook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.")
}()
return nil
}
Expand Down

0 comments on commit 852ab26

Please sign in to comment.