Skip to content

Commit

Permalink
feat: store errors of courier message (#2914)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: The `/admin/courier/messages` endpoint now uses `keysetpagination` instead.
  • Loading branch information
jonas-jonas committed Dec 6, 2022
1 parent efb8ae8 commit fc7aa86
Show file tree
Hide file tree
Showing 45 changed files with 2,285 additions and 554 deletions.
@@ -0,0 +1 @@
"{\"error\":{\"code\":404,\"status\":\"Not Found\",\"message\":\"Unable to locate the resource\"}}\n"
@@ -0,0 +1 @@
"{\"error\":{\"code\":404,\"status\":\"Not Found\",\"message\":\"Unable to locate the resource\"}}\n"
@@ -0,0 +1 @@
"{\"error\":{\"code\":400,\"status\":\"Bad Request\",\"debug\":\"could not parse parameter {id} as UUID, got \",\"message\":\"uuid: incorrect UUID length 10 in string \\\"not-a-uuid\\\"\"}}\n"
@@ -0,0 +1 @@
"{\"error\":{\"code\":400,\"status\":\"Bad Request\",\"debug\":\"could not parse parameter {id} as UUID, got \",\"message\":\"uuid: incorrect UUID length 10 in string \\\"not-a-uuid\\\"\"}}\n"
48 changes: 29 additions & 19 deletions courier/courier_dispatcher.go
Expand Up @@ -10,24 +10,6 @@ import (
)

func (c *courier) DispatchMessage(ctx context.Context, msg Message) error {
maxRetries := c.deps.CourierConfig().CourierMessageRetries(ctx)

if msg.SendCount > maxRetries {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to reset the retried message's status to "abandoned".`)
return err
}

// Skip the message
c.deps.Logger().
WithField("message_id", msg.ID).
Warnf(`Message was abandoned because it did not deliver after %d attempts`, msg.SendCount)
return nil
}

if err := c.deps.CourierPersister().IncrementMessageSendCount(ctx, msg.ID); err != nil {
c.deps.Logger().
WithError(err).
Expand Down Expand Up @@ -68,6 +50,8 @@ func (c *courier) DispatchMessage(ctx context.Context, msg Message) error {
}

func (c *courier) DispatchQueue(ctx context.Context) error {
maxRetries := c.deps.CourierConfig().CourierMessageRetries(ctx)

messages, err := c.deps.CourierPersister().NextMessages(ctx, 10)
if err != nil {
if errors.Is(err, ErrQueueEmpty) {
Expand All @@ -77,7 +61,27 @@ func (c *courier) DispatchQueue(ctx context.Context) error {
}

for k, msg := range messages {
if err := c.DispatchMessage(ctx, msg); err != nil {
if msg.SendCount > maxRetries {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to set the retried message's status to "abandoned".`)
return err
}
// Skip the message
c.deps.Logger().
WithField("message_id", msg.ID).
Warnf(`Message was abandoned because it did not deliver after %d attempts`, msg.SendCount)

} else if err := c.DispatchMessage(ctx, msg); err != nil {

if err := c.deps.CourierPersister().RecordDispatch(ctx, msg.ID, CourierMessageDispatchStatusFailed, err); err != nil {
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to record failure log entry.`)
}

for _, replace := range messages[k:] {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, replace.ID, MessageStatusQueued); err != nil {
Expand All @@ -92,6 +96,12 @@ func (c *courier) DispatchQueue(ctx context.Context) error {
}

return err
} else if err := c.deps.CourierPersister().RecordDispatch(ctx, msg.ID, CourierMessageDispatchStatusSuccess, nil); err != nil {
c.deps.Logger().
WithError(err).
WithField("message_id", msg.ID).
Error(`Unable to record success log entry.`)
// continue with execution, as the message was successfully dispatched
}
}

Expand Down
30 changes: 8 additions & 22 deletions courier/courier_dispatcher_test.go
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/gofrs/uuid"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"github.com/ory/kratos/courier"
"github.com/ory/kratos/courier/template"
Expand Down Expand Up @@ -54,24 +55,9 @@ func TestDispatchMessageWithInvalidSMTP(t *testing.T) {
messages, err := reg.CourierPersister().NextMessages(ctx, 10)
require.Len(t, messages, 1)
})

t.Run("case=max retries reached", func(t *testing.T) {
id := queueNewMessage(t, ctx, c, reg)
message, err := reg.CourierPersister().LatestQueuedMessage(ctx)
require.NoError(t, err)
require.Equal(t, id, message.ID)
message.SendCount = 6

err = c.DispatchMessage(ctx, *message)
require.NoError(t, err)

messages, err := reg.CourierPersister().NextMessages(ctx, 1)
require.Empty(t, messages)
})

}

func TestDispatchMessage2(t *testing.T) {
func TestDispatchQueue(t *testing.T) {
ctx := context.Background()

conf, reg := internal.NewRegistryDefaultWithDSN(t, "")
Expand All @@ -83,12 +69,7 @@ func TestDispatchMessage2(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

id, err := c.QueueEmail(ctx, templates.NewTestStub(reg, &templates.TestStubModel{
To: "test-recipient-1@example.org",
Subject: "test-subject-1",
Body: "test-body-1",
}))
require.NoError(t, err)
id := queueNewMessage(t, ctx, c, reg)
require.NotEqual(t, uuid.Nil, id)

// Fails to deliver the first time
Expand All @@ -106,8 +87,13 @@ func TestDispatchMessage2(t *testing.T) {
var message courier.Message
err = reg.Persister().GetConnection(ctx).
Where("status = ?", courier.MessageStatusAbandoned).
Eager("Dispatches").
First(&message)

require.NoError(t, err)
require.Equal(t, id, message.ID)

require.Len(t, message.Dispatches, 2)
require.Contains(t, gjson.GetBytes(message.Dispatches[0].Error, "reason").String(), "failed to send email via smtp")
require.Contains(t, gjson.GetBytes(message.Dispatches[1].Error, "reason").String(), "failed to send email via smtp")
}
94 changes: 76 additions & 18 deletions courier/handler.go
Expand Up @@ -4,19 +4,24 @@
package courier

import (
"fmt"
"net/http"

"github.com/gofrs/uuid"

"github.com/ory/herodot"
"github.com/ory/x/pagination/keysetpagination"
"github.com/ory/x/pagination/migrationpagination"

"github.com/julienschmidt/httprouter"

"github.com/ory/kratos/driver/config"
"github.com/ory/kratos/x"
"github.com/ory/x/urlx"
)

const AdminRouteCourier = "/courier"
const AdminRouteMessages = AdminRouteCourier + "/messages"
const AdminRouteListMessages = AdminRouteCourier + "/messages"
const AdminRouteGetMessage = AdminRouteCourier + "/messages/:msgID"

type (
handlerDependencies interface {
Expand All @@ -39,12 +44,14 @@ func NewHandler(r handlerDependencies) *Handler {
}

func (h *Handler) RegisterPublicRoutes(public *x.RouterPublic) {
h.r.CSRFHandler().IgnoreGlobs(x.AdminPrefix+AdminRouteMessages, AdminRouteMessages)
public.GET(x.AdminPrefix+AdminRouteMessages, x.RedirectToAdminRoute(h.r))
h.r.CSRFHandler().IgnoreGlobs(x.AdminPrefix+AdminRouteListMessages, AdminRouteListMessages)
public.GET(x.AdminPrefix+AdminRouteListMessages, x.RedirectToAdminRoute(h.r))
public.GET(x.AdminPrefix+AdminRouteGetMessage, x.RedirectToAdminRoute(h.r))
}

func (h *Handler) RegisterAdminRoutes(admin *x.RouterAdmin) {
admin.GET(AdminRouteMessages, h.listCourierMessages)
admin.GET(AdminRouteListMessages, h.listCourierMessages)
admin.GET(AdminRouteGetMessage, h.getCourierMessage)
}

// Paginated Courier Message List Response
Expand All @@ -62,10 +69,9 @@ type listCourierMessagesResponse struct {

// Paginated List Courier Message Parameters
//
// nolint:deadcode,unused
// swagger:parameters listCourierMessages
type ListCourierMessagesParameters struct {
migrationpagination.RequestParameters
keysetpagination.RequestParameters

// Status filters out messages based on status.
// If no value is provided, it doesn't take effect on filter.
Expand Down Expand Up @@ -101,13 +107,13 @@ type ListCourierMessagesParameters struct {
// 400: errorGeneric
// default: errorGeneric
func (h *Handler) listCourierMessages(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
filter, err := parseMessagesFilter(r)
filter, paginator, err := parseMessagesFilter(r)
if err != nil {
h.r.Writer().WriteErrorCode(w, r, http.StatusBadRequest, err)
return
}

l, tc, err := h.r.CourierPersister().ListMessages(r.Context(), filter)
l, tc, nextPage, err := h.r.CourierPersister().ListMessages(r.Context(), filter, paginator)
if err != nil {
h.r.Writer().WriteError(w, r, err)
return
Expand All @@ -119,30 +125,82 @@ func (h *Handler) listCourierMessages(w http.ResponseWriter, r *http.Request, _
}
}

x.PaginationHeader(w, urlx.AppendPaths(h.r.Config().SelfAdminURL(r.Context()), AdminRouteMessages), int64(tc), filter.Page, filter.PerPage)
w.Header().Set("X-Total-Count", fmt.Sprint(tc))
keysetpagination.Header(w, r.URL, nextPage)
h.r.Writer().Write(w, r, l)
}

func parseMessagesFilter(r *http.Request) (ListCourierMessagesParameters, error) {
func parseMessagesFilter(r *http.Request) (ListCourierMessagesParameters, []keysetpagination.Option, error) {
var status *MessageStatus

if r.URL.Query().Has("status") {
ms, err := ToMessageStatus(r.URL.Query().Get("status"))

if err != nil {
return ListCourierMessagesParameters{}, err
return ListCourierMessagesParameters{}, nil, err
}

status = &ms
}

page, itemsPerPage := x.ParsePagination(r)
opts, err := keysetpagination.Parse(r.URL.Query(), keysetpagination.NewMapPageToken)
if err != nil {
return ListCourierMessagesParameters{}, nil, err
}

return ListCourierMessagesParameters{
RequestParameters: migrationpagination.RequestParameters{
Page: page,
PerPage: itemsPerPage,
},
Status: status,
Recipient: r.URL.Query().Get("recipient"),
}, nil
}, opts, nil
}

// Get Courier Message Parameters
//
// swagger:parameters getCourierMessage
// nolint:deadcode,unused
type getCourierMessage struct {
// MessageID is the ID of the message.
//
// required: true
// in: path
MessageID string `json:"id"`
}

// swagger:route GET /admin/courier/messages/{id} courier getCourierMessage
//
// # Get a Message
//
// Gets a specific messages by the given ID.
//
// Produces:
// - application/json
//
// Security:
// oryAccessToken:
//
// Schemes: http, https
//
// Responses:
// 200: message
// 400: errorGeneric
// default: errorGeneric
func (h *Handler) getCourierMessage(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
msgID, err := uuid.FromString(ps.ByName("msgID"))

if err != nil {
h.r.Writer().WriteError(w, r, herodot.ErrBadRequest.WithError(err.Error()).WithDebugf("could not parse parameter {id} as UUID, got %s", ps.ByName("id")))
return
}

message, err := h.r.CourierPersister().FetchMessage(r.Context(), msgID)
if err != nil {
h.r.Writer().WriteError(w, r, err)
return
}

if !h.r.Config().IsInsecureDevMode(r.Context()) {
message.Body = "<redacted-unless-dev-mode>"
}

h.r.Writer().Write(w, r, message)
}

0 comments on commit fc7aa86

Please sign in to comment.