From 9fb291c359b56433e13563e581c28e209b48caff Mon Sep 17 00:00:00 2001 From: Matthew Mayer Date: Mon, 18 Mar 2024 11:00:15 -0700 Subject: [PATCH] Replay with PSM Let msg be replayed immediately Make replay PSM actually work More replay debugging with PSM More replay debugging --- service/main.go | 98 ++++++++++--------------------------------------- 1 file changed, 20 insertions(+), 78 deletions(-) diff --git a/service/main.go b/service/main.go index 2596536..8d95088 100644 --- a/service/main.go +++ b/service/main.go @@ -95,54 +95,6 @@ func (ds *DeadletterService) ReplayDeadMessage(ctx context.Context, req *dante_s ReadOnly: false, Retryable: true, }, func(ctx context.Context, tx sqrlx.Transaction) error { - // switch to PSM - var deadletter, message_id string - var raw sql.NullString - q := sq.Select("message_id, deadletter, raw_msg").From("deadmessage").Where("message_id = ?", req.MessageId) - err := tx.QueryRow(ctx, q).Scan(&message_id, &deadletter, &raw) - if err != nil { - log.WithError(ctx, err).Error("Couldn't query dead letter from db") - return err - } - - deadProto := dante_pb.DeadMessageState{} - err = ds.protojson.Unmarshal([]byte(deadletter), &deadProto) - if err != nil { - log.WithError(ctx, err).Error("Couldn't unmarshal dead letter") - return err - } - deadProto.Status = dante_pb.MessageStatus_MESSAGE_STATUS_REPLAYED - - if deadProto.CurrentSpec.Payload == nil { - log.Infof(ctx, "spec payload is nil, using raw msg") - if !raw.Valid { - log.Info(ctx, "tried to use raw msg but it was null") - return fmt.Errorf("failed to fall back to raw message as it was null") - } - danteAny := dante_pb.Any{ - Json: raw.String, - } - log.Info(ctx, "raw msg used") - deadProto.CurrentSpec.Payload = &danteAny - log.Infof(ctx, "danteany is %v", danteAny.Json) - } - - msg_json, err := ds.protojson.Marshal(&deadProto) - if err != nil { - log.Infof(ctx, "couldn't turn dead letter into json: %v", err.Error()) - return err - } - - u := sq.Update("deadmessage").SetMap(map[string]interface{}{ - "deadletter": msg_json, - }).Where("message_id = ?", req.MessageId) - - _, err = tx.Update(ctx, u) - if err != nil { - log.WithError(ctx, err).Error("Couldn't update dead letter") - return err - } - event := dante_pb.DeadMessageEvent{ Metadata: &dante_pb.Metadata{ EventId: uuid.NewString(), @@ -156,45 +108,24 @@ func (ds *DeadletterService) ReplayDeadMessage(ctx context.Context, req *dante_s }, }, } - - event_json, err := ds.protojson.Marshal(&event) - if err != nil { - log.Infof(ctx, "couldn't turn dead letter event into json: %v", err.Error()) - return err - } - - _, err = tx.Insert(ctx, sq.Insert("deadmessage_event").SetMap(map[string]interface{}{ - "message_id": req.MessageId, - "msg_event": event_json, - "id": event.Metadata.EventId, - })) + s, err := ds.sm.TransitionInTx(ctx, tx, &event) if err != nil { - log.Infof(ctx, "couldn't save message event: %v", err.Error()) + log.Infof(ctx, "state machine transition error: %v", err.Error()) return err } + res.Message = s - if deadProto.CurrentSpec == nil { - log.Infof(ctx, "spec is nil") - return fmt.Errorf("deadproto currentspec is nil and shouldn't be") - } - if deadProto.CurrentSpec.Payload == nil { - log.Infof(ctx, "spec payload is nil") - return fmt.Errorf("deadproto currentspec payload is nil and shouldn't be") - } - if deadProto.CurrentSpec.Payload.Proto == nil { - log.Infof(ctx, "spec payload proto is nil") - return fmt.Errorf("deadproto currentspec payload proto is nil and shouldn't be") - } + log.Infof(ctx, "s currentspec is %+v, grpc-message is '%v'", s.CurrentSpec, s.CurrentSpec.Payload.Proto.MessageName()) // Direct SQS publish: pull this out into a function hdrs := map[string]types.MessageAttributeValue{ "grpc-service": { DataType: aws.String("String"), - StringValue: aws.String(deadProto.CurrentSpec.GrpcName), + StringValue: aws.String(s.CurrentSpec.GrpcName), }, "grpc-message": { DataType: aws.String("String"), - StringValue: aws.String(fmt.Sprintf("%v", deadProto.CurrentSpec.Payload.Proto.MessageName())), + StringValue: aws.String(fmt.Sprintf("%v", s.CurrentSpec.Payload.Proto.MessageName())), }, "Content-Type": { DataType: aws.String("String"), @@ -203,11 +134,11 @@ func (ds *DeadletterService) ReplayDeadMessage(ctx context.Context, req *dante_s } i := sqs.SendMessageInput{ - MessageBody: &deadProto.CurrentSpec.Payload.Json, - QueueUrl: &deadProto.CurrentSpec.QueueName, + MessageBody: &s.CurrentSpec.Payload.Json, + QueueUrl: &s.CurrentSpec.QueueName, MessageAttributes: hdrs, } - log.Infof(ctx, "SQS message to be sent: %+v with body of %v", i, *i.MessageBody) + log.Infof(ctx, "SQS message to be sent: %+v with body of %v and headers %+v", i, *i.MessageBody, hdrs) _, err = ds.sqsClient.SendMessage(ctx, &i) if err != nil { log.Errorf(ctx, "couldn't send SQS message for replay: %v", err.Error()) @@ -319,6 +250,17 @@ func newPsm() (*dante_pb.DeadmessagePSM, error) { return nil })) + // created to replayed + sm.From(dante_pb.MessageStatus_MESSAGE_STATUS_CREATED).Do( + dante_pb.DeadmessagePSMFunc(func(ctx context.Context, + tb dante_pb.DeadmessagePSMTransitionBaton, + state *dante_pb.DeadMessageState, + event *dante_pb.DeadMessageEventType_Replayed) error { + state.Status = dante_pb.MessageStatus_MESSAGE_STATUS_REPLAYED + + return nil + })) + // updated to updated sm.From(dante_pb.MessageStatus_MESSAGE_STATUS_UPDATED).Do( dante_pb.DeadmessagePSMFunc(func(ctx context.Context,