Skip to content

Commit

Permalink
Replay with PSM
Browse files Browse the repository at this point in the history
Let msg be replayed immediately

Make replay PSM actually work

More replay debugging with PSM

More replay debugging
  • Loading branch information
matthewkmayer committed Mar 20, 2024
1 parent b5a8295 commit 9fb291c
Showing 1 changed file with 20 additions and 78 deletions.
98 changes: 20 additions & 78 deletions service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,54 +95,6 @@ func (ds *DeadletterService) ReplayDeadMessage(ctx context.Context, req *dante_s
ReadOnly: false,
Retryable: true,
}, func(ctx context.Context, tx sqrlx.Transaction) error {
// switch to PSM
var deadletter, message_id string
var raw sql.NullString
q := sq.Select("message_id, deadletter, raw_msg").From("deadmessage").Where("message_id = ?", req.MessageId)
err := tx.QueryRow(ctx, q).Scan(&message_id, &deadletter, &raw)
if err != nil {
log.WithError(ctx, err).Error("Couldn't query dead letter from db")
return err
}

deadProto := dante_pb.DeadMessageState{}
err = ds.protojson.Unmarshal([]byte(deadletter), &deadProto)
if err != nil {
log.WithError(ctx, err).Error("Couldn't unmarshal dead letter")
return err
}
deadProto.Status = dante_pb.MessageStatus_MESSAGE_STATUS_REPLAYED

if deadProto.CurrentSpec.Payload == nil {
log.Infof(ctx, "spec payload is nil, using raw msg")
if !raw.Valid {
log.Info(ctx, "tried to use raw msg but it was null")
return fmt.Errorf("failed to fall back to raw message as it was null")
}
danteAny := dante_pb.Any{
Json: raw.String,
}
log.Info(ctx, "raw msg used")
deadProto.CurrentSpec.Payload = &danteAny
log.Infof(ctx, "danteany is %v", danteAny.Json)
}

msg_json, err := ds.protojson.Marshal(&deadProto)
if err != nil {
log.Infof(ctx, "couldn't turn dead letter into json: %v", err.Error())
return err
}

u := sq.Update("deadmessage").SetMap(map[string]interface{}{
"deadletter": msg_json,
}).Where("message_id = ?", req.MessageId)

_, err = tx.Update(ctx, u)
if err != nil {
log.WithError(ctx, err).Error("Couldn't update dead letter")
return err
}

event := dante_pb.DeadMessageEvent{
Metadata: &dante_pb.Metadata{
EventId: uuid.NewString(),
Expand All @@ -156,45 +108,24 @@ func (ds *DeadletterService) ReplayDeadMessage(ctx context.Context, req *dante_s
},
},
}

event_json, err := ds.protojson.Marshal(&event)
if err != nil {
log.Infof(ctx, "couldn't turn dead letter event into json: %v", err.Error())
return err
}

_, err = tx.Insert(ctx, sq.Insert("deadmessage_event").SetMap(map[string]interface{}{
"message_id": req.MessageId,
"msg_event": event_json,
"id": event.Metadata.EventId,
}))
s, err := ds.sm.TransitionInTx(ctx, tx, &event)
if err != nil {
log.Infof(ctx, "couldn't save message event: %v", err.Error())
log.Infof(ctx, "state machine transition error: %v", err.Error())
return err
}
res.Message = s

if deadProto.CurrentSpec == nil {
log.Infof(ctx, "spec is nil")
return fmt.Errorf("deadproto currentspec is nil and shouldn't be")
}
if deadProto.CurrentSpec.Payload == nil {
log.Infof(ctx, "spec payload is nil")
return fmt.Errorf("deadproto currentspec payload is nil and shouldn't be")
}
if deadProto.CurrentSpec.Payload.Proto == nil {
log.Infof(ctx, "spec payload proto is nil")
return fmt.Errorf("deadproto currentspec payload proto is nil and shouldn't be")
}
log.Infof(ctx, "s currentspec is %+v, grpc-message is '%v'", s.CurrentSpec, s.CurrentSpec.Payload.Proto.MessageName())

// Direct SQS publish: pull this out into a function
hdrs := map[string]types.MessageAttributeValue{
"grpc-service": {
DataType: aws.String("String"),
StringValue: aws.String(deadProto.CurrentSpec.GrpcName),
StringValue: aws.String(s.CurrentSpec.GrpcName),
},
"grpc-message": {
DataType: aws.String("String"),
StringValue: aws.String(fmt.Sprintf("%v", deadProto.CurrentSpec.Payload.Proto.MessageName())),
StringValue: aws.String(fmt.Sprintf("%v", s.CurrentSpec.Payload.Proto.MessageName())),
},
"Content-Type": {
DataType: aws.String("String"),
Expand All @@ -203,11 +134,11 @@ func (ds *DeadletterService) ReplayDeadMessage(ctx context.Context, req *dante_s
}

i := sqs.SendMessageInput{
MessageBody: &deadProto.CurrentSpec.Payload.Json,
QueueUrl: &deadProto.CurrentSpec.QueueName,
MessageBody: &s.CurrentSpec.Payload.Json,
QueueUrl: &s.CurrentSpec.QueueName,
MessageAttributes: hdrs,
}
log.Infof(ctx, "SQS message to be sent: %+v with body of %v", i, *i.MessageBody)
log.Infof(ctx, "SQS message to be sent: %+v with body of %v and headers %+v", i, *i.MessageBody, hdrs)
_, err = ds.sqsClient.SendMessage(ctx, &i)
if err != nil {
log.Errorf(ctx, "couldn't send SQS message for replay: %v", err.Error())
Expand Down Expand Up @@ -319,6 +250,17 @@ func newPsm() (*dante_pb.DeadmessagePSM, error) {
return nil
}))

// created to replayed
sm.From(dante_pb.MessageStatus_MESSAGE_STATUS_CREATED).Do(
dante_pb.DeadmessagePSMFunc(func(ctx context.Context,
tb dante_pb.DeadmessagePSMTransitionBaton,
state *dante_pb.DeadMessageState,
event *dante_pb.DeadMessageEventType_Replayed) error {
state.Status = dante_pb.MessageStatus_MESSAGE_STATUS_REPLAYED

return nil
}))

// updated to updated
sm.From(dante_pb.MessageStatus_MESSAGE_STATUS_UPDATED).Do(
dante_pb.DeadmessagePSMFunc(func(ctx context.Context,
Expand Down

0 comments on commit 9fb291c

Please sign in to comment.