Skip to content

Commit

Permalink
ability to Nack / terminate a message (#1625)
Browse files Browse the repository at this point in the history
  • Loading branch information
idanasulin2706 committed Jan 23, 2024
1 parent 0640142 commit a94387f
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 7 deletions.
8 changes: 4 additions & 4 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5901,7 +5901,7 @@ func GetMsgByStationIdAndMsgSeq(stationId, messageSeq, partitionNumber int) (boo
return true, message[0], nil
}

func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, tenantName string, partitionNumber int) (int, bool, error) {
func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, tenantName string, partitionNumber int, validationError string) (int, bool, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
updated := false
Expand Down Expand Up @@ -5979,7 +5979,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, producerName, poisonedCgs, messageDetails, updatedAt, "poison", "", tenantName, partitionNumber)
rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, producerName, poisonedCgs, messageDetails, updatedAt, "poison", validationError, tenantName, partitionNumber)
if err != nil {
return 0, updated, err
}
Expand Down Expand Up @@ -6008,7 +6008,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
}
}
} else { // then update
query = `UPDATE dls_messages SET poisoned_cgs = ARRAY_APPEND(poisoned_cgs, $1), updated_at = $4 WHERE station_id=$2 AND message_seq=$3 AND not($1 = ANY(poisoned_cgs)) AND tenant_name=$5 RETURNING id`
query = `UPDATE dls_messages SET poisoned_cgs = ARRAY_APPEND(poisoned_cgs, $1), updated_at = $4, validation_error = $6 WHERE station_id=$2 AND message_seq=$3 AND not($1 = ANY(poisoned_cgs)) AND tenant_name=$5 RETURNING id`
stmt, err := tx.Prepare(ctx, "update_poisoned_cgs", query)
if err != nil {
return 0, updated, err
Expand All @@ -6017,7 +6017,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err = tx.Query(ctx, stmt.Name, poisonedCgs[0], stationId, messageSeq, updatedAt, tenantName)
rows, err = tx.Query(ctx, stmt.Name, poisonedCgs[0], stationId, messageSeq, updatedAt, tenantName, validationError)
if err != nil {
return 0, updated, err
}
Expand Down
8 changes: 8 additions & 0 deletions models/dead_letter_station.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ type SchemaVerseDlsMessageSdk struct {
PartitionNumber int `json:"partition_number"`
}

type NackedDlsMessageSdk struct {
StationName string `json:"station_name"`
Error string `json:"error"`
CgName string `json:"cg_name"`
Seq uint64 `json:"seq"`
Partition int `json:"partition"`
}

type FunctionsDlsMessage struct {
StationID int `json:"station_id"`
TenantName string `json:"tenant_name"`
Expand Down
93 changes: 93 additions & 0 deletions server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ const NOTIFICATION_EVENTS_SUBJ = "$memphis_notifications"
const PM_RESEND_ACK_SUBJ = "$memphis_pm_acks"
const TIERED_STORAGE_CONSUMER = "$memphis_tiered_storage_consumer"
const DLS_UNACKED_CONSUMER = "$memphis_dls_unacked_consumer"
const NACKED_DLS_SUBJ = "$memphis_nacked_dls"
const NACKED_DLS_INNER_SUBJ = "$memphis_nacked_inner_dls"
const NACKED_DLS_CONSUMER = "$memphis_nacked_dls_consumer"
const SCHEMAVERSE_DLS_SUBJ = "$memphis_schemaverse_dls"
const SCHEMAVERSE_DLS_INNER_SUBJ = "$memphis_schemaverse_inner_dls"
const SCHEMAVERSE_DLS_CONSUMER = "$memphis_schemaverse_dls_consumer"
Expand Down Expand Up @@ -211,6 +214,19 @@ func (s *Server) ListenForSchemaverseDlsEvents() error {
return nil
}

func (s *Server) ListenForNackedDlsEvents() error {
err := s.queueSubscribe(s.MemphisGlobalAccountString(), NACKED_DLS_SUBJ, NACKED_DLS_SUBJ+"_group", func(_ *client, subject, reply string, msg []byte) {
go func(msg []byte) {
s.sendInternalAccountMsg(s.MemphisGlobalAccount(), NACKED_DLS_INNER_SUBJ, msg)
}(copyBytes(msg))
})
if err != nil {
return err
}

return nil
}

func (s *Server) ListenForPoisonMsgAcks() error {
err := s.queueSubscribe(s.MemphisGlobalAccountString(), PM_RESEND_ACK_SUBJ, PM_RESEND_ACK_SUBJ+"_group", func(_ *client, subject, reply string, msg []byte) {
go func(msg []byte) {
Expand Down Expand Up @@ -311,6 +327,11 @@ func (s *Server) StartBackgroundTasks() error {
return errors.New("Failed to subscribing for schemaverse dls" + err.Error())
}

err = s.ListenForNackedDlsEvents()
if err != nil {
return errors.New("Failed to subscribing for nacked dls" + err.Error())
}

err = s.ListenForPoisonMsgAcks()
if err != nil {
return errors.New("Failed subscribing for poison message acks: " + err.Error())
Expand All @@ -337,6 +358,7 @@ func (s *Server) StartBackgroundTasks() error {
}

go s.ConsumeSchemaverseDlsMessages()
go s.ConsumeNackedDlsMessages()
go s.ConsumeUnackedMsgs()
go s.ConsumeFunctionsDlsMessages()
go s.ConsumeTieredStorageMsgs()
Expand Down Expand Up @@ -608,6 +630,77 @@ func (s *Server) ConsumeSchemaverseDlsMessages() {
}
}

func (s *Server) ConsumeNackedDlsMessages() {
type nackedDlsMsg struct {
Msg []byte
ReplySubject string
}
amount := 1000
req := []byte(strconv.FormatUint(uint64(amount), 10))
for {
if DLS_NACKED_CONSUMER_CREATED && DLS_NACKED_STREAM_CREATED {
resp := make(chan nackedDlsMsg)
replySubj := NACKED_DLS_CONSUMER + "_reply_" + s.memphis.nuid.Next()

// subscribe to schemavers dls messages
sub, err := s.subscribeOnAcc(s.MemphisGlobalAccount(), replySubj, replySubj+"_sid", func(_ *client, subject, reply string, msg []byte) {
go func(subject, reply string, msg []byte) {
// Ignore 409 Exceeded MaxWaiting cases
if reply != _EMPTY_ {
message := nackedDlsMsg{
Msg: msg,
ReplySubject: reply,
}
resp <- message
}
}(subject, reply, copyBytes(msg))
})
if err != nil {
s.Errorf("Failed to subscribe to nacked dls messages: %v", err.Error())
continue
}

// send JS API request to get more messages
subject := fmt.Sprintf(JSApiRequestNextT, dlsNackedStream, NACKED_DLS_CONSUMER)
s.sendInternalAccountMsgWithReply(s.MemphisGlobalAccount(), subject, replySubj, nil, req, true)

s.Debugf("ConsumeNackedDlsMessages: sending fetch request")

timeout := time.NewTimer(5 * time.Second)
msgs := make([]nackedDlsMsg, 0)
stop := false
for {
if stop {
s.unsubscribeOnAcc(s.MemphisGlobalAccount(), sub)
break
}
select {
case nackedDlsMsg := <-resp:
msgs = append(msgs, nackedDlsMsg)
if len(msgs) == amount {
stop = true
s.Debugf("ConsumeNackedDlsMessages: finished appending %v messages", len(msgs))
}
case <-timeout.C:
stop = true
s.Debugf("ConsumeNackedDlsMessages: finished because of timer: %v messages", len(msgs))
}
}
for _, message := range msgs {
msg := message.Msg
err := s.handleNackedDlsMsg(msg)
if err == nil {
// send ack
s.sendInternalAccountMsgWithEcho(s.MemphisGlobalAccount(), message.ReplySubject, []byte(_EMPTY_))
}
}
} else {
s.Warnf("ConsumeNackedDlsMessages: waiting for consumer and stream to be created")
time.Sleep(2 * time.Second)
}
}
}

func (s *Server) RemoveOldDlsMsgs() {
ticker := time.NewTicker(2 * time.Minute)
for range ticker.C {
Expand Down
4 changes: 4 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
THROUGHPUT_LEGACY_STREAM_EXIST = true
case dlsSchemaverseStream:
DLS_SCHEMAVERSE_STREAM_CREATED = true
case dlsNackedStream:
DLS_NACKED_STREAM_CREATED = true
case integrationsAuditLogsStream:
INTEGRATIONS_AUDIT_LOGS_STREAM_CREATED = true
case notificationsStreamName:
Expand Down Expand Up @@ -1386,6 +1388,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
DLS_UNACKED_CONSUMER_CREATED = true
case dlsSchemaverseStream:
DLS_SCHEMAVERSE_CONSUMER_CREATED = true
case dlsNackedStream:
DLS_NACKED_CONSUMER_CREATED = true
case notificationsStreamName:
NOTIFICATIONS_BUFFER_CONSUMER_CREATED = true
case systemTasksStreamName:
Expand Down
106 changes: 103 additions & 3 deletions server/memphis_handlers_dls_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *Server) handleNewUnackedMsg(msg []byte) error {
Headers: headersJson,
}

dlsMsgId, updated, err := db.StorePoisonMsg(station.ID, int(messageSeq), cgName, producedByHeader, poisonedCgs, messageDetails, station.TenantName, partitionNumber)
dlsMsgId, updated, err := db.StorePoisonMsg(station.ID, int(messageSeq), cgName, producedByHeader, poisonedCgs, messageDetails, station.TenantName, partitionNumber, "")
if err != nil {
serv.Errorf("[tenant: %v]handleNewUnackedMsg at StorePoisonMsg: Error while getting notified about a poison message: %v", station.TenantName, err.Error())
return err
Expand Down Expand Up @@ -168,13 +168,14 @@ func (s *Server) handleSchemaverseDlsMsg(msg []byte) error {
return err
}

exist, station, err := db.GetStationByName(message.StationName, tenantName)
stationName := StationNameFromStreamName(message.StationName)
exist, station, err := db.GetStationByName(stationName.Ext(), tenantName)
if err != nil {
serv.Errorf("[tenant: %v]handleSchemaverseDlsMsg: %v", tenantName, err.Error())
return err
}
if !exist {
serv.Warnf("[tenant: %v]handleSchemaverseDlsMsg: station %v couldn't been found", tenantName, message.StationName)
serv.Warnf("[tenant: %v]handleSchemaverseDlsMsg: station %v couldn't been found", tenantName, stationName.Ext())
return nil
}

Expand All @@ -198,6 +199,105 @@ func (s *Server) handleSchemaverseDlsMsg(msg []byte) error {
return nil
}

func (s *Server) handleNackedDlsMsg(msg []byte) error {
tenantName, stringMessage, err := s.getTenantNameAndMessage(msg)
if err != nil {
s.Errorf("handleNackedDlsMsg at getTenantNameAndMessage: %v", err.Error())
return err
}
var message models.NackedDlsMessageSdk
err = json.Unmarshal([]byte(stringMessage), &message)
if err != nil {
serv.Errorf("[tenant: %v]handleNackedDlsMsg: %v", tenantName, err.Error())
return err
}

if message.Partition == 0 {
serv.Errorf("[tenant: %v]handleNackedDlsMsg - missing partition number: %v", tenantName, err.Error())
return err
}

stationName := StationNameFromStreamName(message.StationName)
streamName := stationName.Intern() + "$" + strconv.Itoa(message.Partition)
exist, station, err := db.GetStationByName(stationName.Ext(), tenantName)
if err != nil {
serv.Errorf("[tenant: %v]handleNackedDlsMsg: %v", tenantName, err.Error())
return err
}
if !exist {
serv.Warnf("[tenant: %v]handleNackedDlsMsg: station %v couldn't been found", tenantName, stationName.Ext())
return nil
}
if !station.DlsConfigurationPoison {
return nil
}

poisonMessageContent, err := s.memphisGetMessage(tenantName, streamName, uint64(message.Seq))
if err != nil {
if IsNatsErr(err, JSNoMessageFoundErr) {
return nil
}
serv.Errorf("[tenant: %v]handleNackedDlsMsg at memphisGetMessage: station: %v, Error while getting notified about a poison message: %v", tenantName, stationName.Ext(), err.Error())
return err
}


timeSentTimeStamp := poisonMessageContent.Time
data := poisonMessageContent.Data
lenPayload := len(poisonMessageContent.Data) + len(poisonMessageContent.Header)
headers := poisonMessageContent.Header
var headersJson map[string]string
if headers != nil {
headersJson, err = DecodeHeader(headers)
if err != nil {
serv.Errorf("handleNackedDlsMsg: %v", err.Error())
return err
}
}

producedByHeader := _EMPTY_
poisonedCgs := []string{}
producedByHeader = headersJson["$memphis_producedBy"]
if producedByHeader == _EMPTY_ {
producedByHeader = "unknown"
}
poisonedCgs = append(poisonedCgs, message.CgName)

messageDetails := models.MessagePayload{
TimeSent: timeSentTimeStamp,
Size: lenPayload,
Data: hex.EncodeToString(data),
Headers: headersJson,
}

dlsMsgId, updated, err := db.StorePoisonMsg(station.ID, int(message.Seq), message.CgName, producedByHeader, poisonedCgs, messageDetails, tenantName, message.Partition, message.Error)
if err != nil {
serv.Errorf("[tenant: %v]handleNackedDlsMsg at StorePoisonMsg: Error while getting notified about a poison message: %v", station.TenantName, err.Error())
return err
}
if !updated {
err = s.sendToDlsStation(station, data, headersJson, "unacked", _EMPTY_)
if err != nil {
serv.Errorf("[tenant: %v]handleNackedDlsMsg at sendToDlsStation: station: %v, Error while getting notified about a poison message: %v", station.TenantName, station.DlsStation, err.Error())
return err
}
}

if dlsMsgId == 0 { // nothing to do
return nil
}

idForUrl := strconv.Itoa(dlsMsgId)
var msgUrl = s.opts.UiHost + "/stations/" + stationName.Ext() + "/" + idForUrl
err = s.SendNotification(station.TenantName, PoisonMessageTitle, "Poison message has been identified, for more details head to: "+msgUrl, PoisonMAlert)
if err != nil {
serv.Warnf("[tenant: %v]handleNackedDlsMsg at SendNotification: Error while sending a poison message notification: %v", station.TenantName, err.Error())
return nil
}

return nil
}

func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station, partitionNumber int) ([]models.LightDlsMessageResponse, []models.LightDlsMessageResponse, []models.LightDlsMessageResponse, int, error) {
poisonMessages := make([]models.LightDlsMessageResponse, 0)
schemaMessages := make([]models.LightDlsMessageResponse, 0)
Expand Down
1 change: 1 addition & 0 deletions server/memphis_handlers_rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func GetAllMemphisAndNatsInternalSubjects() []string {
var subjects []string

// Memphis subjects
subjects = append(subjects, NACKED_DLS_SUBJ)
subjects = append(subjects, SCHEMAVERSE_DLS_SUBJ)
subjects = append(subjects, sdkClientsUpdatesSubject)
subjects = append(subjects, PM_RESEND_ACK_SUBJ)
Expand Down
Loading

0 comments on commit a94387f

Please sign in to comment.