Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: store errors of courier message #2914

Merged
merged 37 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c07664e
chore: use `keysetpagination` instead of x
jonas-jonas Nov 8, 2022
f6ae4df
feat: record errors of courier messages
jonas-jonas Nov 9, 2022
6b5fc82
chore: sdk
jonas-jonas Nov 9, 2022
aba261d
chore: add persister test
jonas-jonas Nov 9, 2022
2327aeb
fix: naming of dispatches table
jonas-jonas Nov 10, 2022
d933117
Merge branch 'feat/mailCourierImprovements' into feat/errorsInCourier…
jonas-jonas Nov 10, 2022
9e61610
chore: sdk
jonas-jonas Nov 10, 2022
b160864
chore: fix compile error
jonas-jonas Nov 10, 2022
462bec0
fix: paginator
jonas-jonas Nov 10, 2022
b785408
Merge remote-tracking branch 'origin/master' into feat/errorsInCourie…
jonas-jonas Nov 21, 2022
9591d39
Merge remote-tracking branch 'origin/master' into feat/errorsInCourie…
jonas-jonas Nov 22, 2022
fd3b1fe
fix: migrations
jonas-jonas Nov 22, 2022
a33f088
chore: add keysetpagination and getMessage endpoint
jonas-jonas Nov 24, 2022
4f44021
chore: sdk
jonas-jonas Nov 24, 2022
093d8f7
Merge remote-tracking branch 'origin/master' into feat/errorsInCourie…
jonas-jonas Nov 24, 2022
d879473
chore: pass opts instead of paginator
jonas-jonas Nov 24, 2022
15d3adf
chore: add default token to pagination
jonas-jonas Nov 24, 2022
3d4abf0
chore(temp): add test data endpoint
jonas-jonas Nov 24, 2022
da1b662
chore: update ory/x
jonas-jonas Nov 28, 2022
d21da9e
chore: add tests
jonas-jonas Nov 28, 2022
961b903
chore: use time.RFC3339 for dates
jonas-jonas Nov 29, 2022
848c2ef
chore: use RFC3339Nano
jonas-jonas Nov 29, 2022
d18f999
chore: use `json` errors in db
jonas-jonas Nov 29, 2022
e44d31d
chore: use `WithError`
jonas-jonas Nov 29, 2022
3090542
chore: add ordering to `courier_dispatches`
jonas-jonas Nov 29, 2022
8b4224b
chore: failing tests
jonas-jonas Nov 29, 2022
e392a49
chore: tests
jonas-jonas Nov 30, 2022
61d71c6
chore: update ory/x
jonas-jonas Nov 30, 2022
4828712
Merge branch 'master' into feat/errorsInCourierMessages
jonas-jonas Nov 30, 2022
6c03dc2
chore: cleanup
jonas-jonas Dec 1, 2022
5aabccf
chore: update x to master
jonas-jonas Dec 1, 2022
7535b77
chore: update new ory/x errors
jonas-jonas Dec 1, 2022
42ed352
chore: add some more tests
jonas-jonas Dec 1, 2022
d253ae7
Merge remote-tracking branch 'origin/master' into feat/errorsInCourie…
jonas-jonas Dec 2, 2022
e073e80
fix: flaky configuration test
jonas-jonas Dec 5, 2022
bb23997
chore: cleanup
jonas-jonas Dec 5, 2022
e1ce94f
chore: review
jonas-jonas Dec 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"{\"error\":{\"code\":404,\"status\":\"Not Found\",\"message\":\"Unable to locate the resource\"}}\n"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"{\"error\":{\"code\":404,\"status\":\"Not Found\",\"message\":\"Unable to locate the resource\"}}\n"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"{\"error\":{\"code\":400,\"status\":\"Bad Request\",\"debug\":\"could not parse parameter {id} as UUID, got \",\"message\":\"uuid: incorrect UUID length 10 in string \\\"not-a-uuid\\\"\"}}\n"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"{\"error\":{\"code\":400,\"status\":\"Bad Request\",\"debug\":\"could not parse parameter {id} as UUID, got \",\"message\":\"uuid: incorrect UUID length 10 in string \\\"not-a-uuid\\\"\"}}\n"
48 changes: 29 additions & 19 deletions courier/courier_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,6 @@ import (
)

func (c *courier) DispatchMessage(ctx context.Context, msg Message) error {
maxRetries := c.deps.CourierConfig().CourierMessageRetries(ctx)

if msg.SendCount > maxRetries {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to reset the retried message's status to "abandoned".`)
return err
}

// Skip the message
c.deps.Logger().
WithField("message_id", msg.ID).
Warnf(`Message was abandoned because it did not deliver after %d attempts`, msg.SendCount)
return nil
}

if err := c.deps.CourierPersister().IncrementMessageSendCount(ctx, msg.ID); err != nil {
c.deps.Logger().
WithError(err).
Expand Down Expand Up @@ -68,6 +50,8 @@ func (c *courier) DispatchMessage(ctx context.Context, msg Message) error {
}

func (c *courier) DispatchQueue(ctx context.Context) error {
maxRetries := c.deps.CourierConfig().CourierMessageRetries(ctx)

messages, err := c.deps.CourierPersister().NextMessages(ctx, 10)
if err != nil {
if errors.Is(err, ErrQueueEmpty) {
Expand All @@ -77,7 +61,27 @@ func (c *courier) DispatchQueue(ctx context.Context) error {
}

for k, msg := range messages {
if err := c.DispatchMessage(ctx, msg); err != nil {
if msg.SendCount > maxRetries {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to set the retried message's status to "abandoned".`)
return err
}
// Skip the message
c.deps.Logger().
WithField("message_id", msg.ID).
Warnf(`Message was abandoned because it did not deliver after %d attempts`, msg.SendCount)

} else if err := c.DispatchMessage(ctx, msg); err != nil {

if err := c.deps.CourierPersister().RecordDispatch(ctx, msg.ID, CourierMessageDispatchStatusFailed, err); err != nil {
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to record failure log entry.`)
}

for _, replace := range messages[k:] {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, replace.ID, MessageStatusQueued); err != nil {
Expand All @@ -92,6 +96,12 @@ func (c *courier) DispatchQueue(ctx context.Context) error {
}

return err
} else if err := c.deps.CourierPersister().RecordDispatch(ctx, msg.ID, CourierMessageDispatchStatusSuccess, nil); err != nil {
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to record success log entry.`)
// continue with execution, as the message was successfully dispatched
}
}

Expand Down
30 changes: 8 additions & 22 deletions courier/courier_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/gofrs/uuid"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"github.com/ory/kratos/courier"
"github.com/ory/kratos/courier/template"
Expand Down Expand Up @@ -54,24 +55,9 @@ func TestDispatchMessageWithInvalidSMTP(t *testing.T) {
messages, err := reg.CourierPersister().NextMessages(ctx, 10)
require.Len(t, messages, 1)
})

t.Run("case=max retries reached", func(t *testing.T) {
aeneasr marked this conversation as resolved.
Show resolved Hide resolved
id := queueNewMessage(t, ctx, c, reg)
message, err := reg.CourierPersister().LatestQueuedMessage(ctx)
require.NoError(t, err)
require.Equal(t, id, message.ID)
message.SendCount = 6

err = c.DispatchMessage(ctx, *message)
require.NoError(t, err)

messages, err := reg.CourierPersister().NextMessages(ctx, 1)
require.Empty(t, messages)
})

}

func TestDispatchMessage2(t *testing.T) {
func TestDispatchQueue(t *testing.T) {
ctx := context.Background()

conf, reg := internal.NewRegistryDefaultWithDSN(t, "")
Expand All @@ -83,12 +69,7 @@ func TestDispatchMessage2(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

id, err := c.QueueEmail(ctx, templates.NewTestStub(reg, &templates.TestStubModel{
To: "test-recipient-1@example.org",
Subject: "test-subject-1",
Body: "test-body-1",
}))
require.NoError(t, err)
id := queueNewMessage(t, ctx, c, reg)
require.NotEqual(t, uuid.Nil, id)

// Fails to deliver the first time
Expand All @@ -106,8 +87,13 @@ func TestDispatchMessage2(t *testing.T) {
var message courier.Message
err = reg.Persister().GetConnection(ctx).
Where("status = ?", courier.MessageStatusAbandoned).
Eager("Dispatches").
First(&message)

require.NoError(t, err)
require.Equal(t, id, message.ID)

require.Len(t, message.Dispatches, 2)
require.Contains(t, gjson.GetBytes(message.Dispatches[0].Error, "reason").String(), "failed to send email via smtp")
require.Contains(t, gjson.GetBytes(message.Dispatches[1].Error, "reason").String(), "failed to send email via smtp")
}
94 changes: 76 additions & 18 deletions courier/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@
package courier

import (
"fmt"
"net/http"

"github.com/gofrs/uuid"

"github.com/ory/herodot"
"github.com/ory/x/pagination/keysetpagination"
"github.com/ory/x/pagination/migrationpagination"

"github.com/julienschmidt/httprouter"

"github.com/ory/kratos/driver/config"
"github.com/ory/kratos/x"
"github.com/ory/x/urlx"
)

const AdminRouteCourier = "/courier"
const AdminRouteMessages = AdminRouteCourier + "/messages"
const AdminRouteListMessages = AdminRouteCourier + "/messages"
const AdminRouteGetMessage = AdminRouteCourier + "/messages/:msgID"

type (
handlerDependencies interface {
Expand All @@ -39,12 +44,14 @@ func NewHandler(r handlerDependencies) *Handler {
}

func (h *Handler) RegisterPublicRoutes(public *x.RouterPublic) {
h.r.CSRFHandler().IgnoreGlobs(x.AdminPrefix+AdminRouteMessages, AdminRouteMessages)
public.GET(x.AdminPrefix+AdminRouteMessages, x.RedirectToAdminRoute(h.r))
h.r.CSRFHandler().IgnoreGlobs(x.AdminPrefix+AdminRouteListMessages, AdminRouteListMessages)
public.GET(x.AdminPrefix+AdminRouteListMessages, x.RedirectToAdminRoute(h.r))
public.GET(x.AdminPrefix+AdminRouteGetMessage, x.RedirectToAdminRoute(h.r))
}

func (h *Handler) RegisterAdminRoutes(admin *x.RouterAdmin) {
admin.GET(AdminRouteMessages, h.listCourierMessages)
admin.GET(AdminRouteListMessages, h.listCourierMessages)
admin.GET(AdminRouteGetMessage, h.getCourierMessage)
}

// Paginated Courier Message List Response
Expand All @@ -62,10 +69,9 @@ type listCourierMessagesResponse struct {

// Paginated List Courier Message Parameters
//
// nolint:deadcode,unused
// swagger:parameters listCourierMessages
type ListCourierMessagesParameters struct {
migrationpagination.RequestParameters
keysetpagination.RequestParameters
aeneasr marked this conversation as resolved.
Show resolved Hide resolved

// Status filters out messages based on status.
// If no value is provided, it doesn't take effect on filter.
Expand Down Expand Up @@ -101,13 +107,13 @@ type ListCourierMessagesParameters struct {
// 400: errorGeneric
// default: errorGeneric
func (h *Handler) listCourierMessages(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
filter, err := parseMessagesFilter(r)
filter, paginator, err := parseMessagesFilter(r)
if err != nil {
h.r.Writer().WriteErrorCode(w, r, http.StatusBadRequest, err)
return
}

l, tc, err := h.r.CourierPersister().ListMessages(r.Context(), filter)
l, tc, nextPage, err := h.r.CourierPersister().ListMessages(r.Context(), filter, paginator)
if err != nil {
h.r.Writer().WriteError(w, r, err)
return
Expand All @@ -119,30 +125,82 @@ func (h *Handler) listCourierMessages(w http.ResponseWriter, r *http.Request, _
}
}

x.PaginationHeader(w, urlx.AppendPaths(h.r.Config().SelfAdminURL(r.Context()), AdminRouteMessages), int64(tc), filter.Page, filter.PerPage)
w.Header().Set("X-Total-Count", fmt.Sprint(tc))
keysetpagination.Header(w, r.URL, nextPage)
h.r.Writer().Write(w, r, l)
}

func parseMessagesFilter(r *http.Request) (ListCourierMessagesParameters, error) {
func parseMessagesFilter(r *http.Request) (ListCourierMessagesParameters, []keysetpagination.Option, error) {
var status *MessageStatus

if r.URL.Query().Has("status") {
ms, err := ToMessageStatus(r.URL.Query().Get("status"))

if err != nil {
return ListCourierMessagesParameters{}, err
return ListCourierMessagesParameters{}, nil, err
}

status = &ms
}

page, itemsPerPage := x.ParsePagination(r)
opts, err := keysetpagination.Parse(r.URL.Query(), keysetpagination.NewMapPageToken)
if err != nil {
return ListCourierMessagesParameters{}, nil, err
}

return ListCourierMessagesParameters{
RequestParameters: migrationpagination.RequestParameters{
Page: page,
PerPage: itemsPerPage,
},
Status: status,
Recipient: r.URL.Query().Get("recipient"),
}, nil
}, opts, nil
}

// Get Courier Message Parameters
//
// swagger:parameters getCourierMessage
// nolint:deadcode,unused
type getCourierMessage struct {
// MessageID is the ID of the message.
//
// required: true
// in: path
MessageID string `json:"id"`
}

// swagger:route GET /admin/courier/messages/{id} courier getCourierMessage
//
// # Get a Message
//
// Gets a specific messages by the given ID.
//
// Produces:
// - application/json
//
// Security:
// oryAccessToken:
//
// Schemes: http, https
//
// Responses:
// 200: message
// 400: errorGeneric
// default: errorGeneric
func (h *Handler) getCourierMessage(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
msgID, err := uuid.FromString(ps.ByName("msgID"))

if err != nil {
h.r.Writer().WriteError(w, r, herodot.ErrBadRequest.WithError(err.Error()).WithDebugf("could not parse parameter {id} as UUID, got %s", ps.ByName("id")))
return
}

message, err := h.r.CourierPersister().FetchMessage(r.Context(), msgID)
if err != nil {
h.r.Writer().WriteError(w, r, err)
return
}

if !h.r.Config().IsInsecureDevMode(r.Context()) {
message.Body = "<redacted-unless-dev-mode>"
aeneasr marked this conversation as resolved.
Show resolved Hide resolved
}

h.r.Writer().Write(w, r, message)
}