Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix - DLS mechanism data loss + improve tiered storage mechanism #949

Merged
merged 4 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func InitalizeMetadataDbConnection(l logger) (MetadataStorage, error) {
if err != nil {
return MetadataStorage{}, err
}
config.MaxConns = 5
config.MaxConns = 10

if configuration.METADATA_DB_TLS_ENABLED {
CACert, err := os.ReadFile(configuration.METADATA_DB_TLS_CA)
Expand Down
211 changes: 128 additions & 83 deletions server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const CONFIGURATIONS_RELOAD_SIGNAL_SUBJ = "$memphis_config_reload_signal"
const NOTIFICATION_EVENTS_SUBJ = "$memphis_notifications"
const PM_RESEND_ACK_SUBJ = "$memphis_pm_acks"
const TIERED_STORAGE_CONSUMER = "$memphis_tiered_storage_consumer"
const DLS_UNACKED_CONSUMER = "$memphis_dls_unacked_consumer"
const SCHEMAVERSE_DLS_SUBJ = "$memphis_schemaverse_dls"

var LastReadThroughput models.Throughput
Expand Down Expand Up @@ -213,7 +214,6 @@ func (s *Server) CalculateSelfThroughput() error {
}

func (s *Server) StartBackgroundTasks() error {
s.ListenForPoisonMessages()
err := s.ListenForZombieConnCheckRequests()
if err != nil {
return errors.New("Failed subscribing for zombie conns check requests: " + err.Error())
Expand All @@ -239,20 +239,14 @@ func (s *Server) StartBackgroundTasks() error {
return errors.New("Failed subscribing for configurations update: " + err.Error())
}

// creating consumer + start listening
err = s.ListenForTieredStorageMessages()
if err != nil {
return errors.New("Failed to subscribe for tiered storage messages" + err.Error())
}

err = s.ListenForSchemaverseDlsEvents()
if err != nil {
return errors.New("Failed to subscribe for schemaverse dls" + err.Error())
return errors.New("Failed to subscribing for schemaverse dls" + err.Error())
}
go s.RemoveOldDlsMsgs()

// send JS API request to get more messages
go s.sendPeriodicJsApiFetchTieredStorageMsgs()
go s.ConsumeUnackedMsgs()
go s.ConsumeTieredStorageMsgs()
go s.RemoveOldDlsMsgs()
go s.uploadMsgsToTier2Storage()

err = s.InitializeThroughputSampling()
Expand All @@ -270,25 +264,21 @@ func (s *Server) uploadMsgsToTier2Storage() {
currentTimeFrame = s.opts.TieredStorageUploadIntervalSec
ticker.Reset(time.Duration(currentTimeFrame) * time.Second)
// update consumer when TIERED_STORAGE_TIME_FRAME_SEC configuration was changed
durableName := TIERED_STORAGE_CONSUMER
tieredStorageTimeFrame := time.Duration(currentTimeFrame) * time.Second
filterSubject := tieredStorageStream + ".>"
cc := ConsumerConfig{
DeliverPolicy: DeliverAll,
AckPolicy: AckExplicit,
Durable: durableName,
FilterSubject: filterSubject,
AckWait: time.Duration(2) * tieredStorageTimeFrame,
Durable: TIERED_STORAGE_CONSUMER,
FilterSubject: tieredStorageStream + ".>",
AckWait: time.Duration(2) * time.Duration(currentTimeFrame) * time.Second,
MaxAckPending: -1,
MaxDeliver: 1,
MaxDeliver: 10,
}
err := serv.memphisAddConsumer(tieredStorageStream, &cc)
if err != nil {
serv.Errorf("Failed add tiered storage consumer: " + err.Error())
return
}
TIERED_STORAGE_CONSUMER_CREATED = true

}
tieredStorageMapLock.Lock()
if len(tieredStorageMsgsMap.m) > 0 {
Expand All @@ -300,6 +290,7 @@ func (s *Server) uploadMsgsToTier2Storage() {
}
}

// ack all messages uploaded to tiered 2 storage
for i, msgs := range tieredStorageMsgsMap.m {
for _, msg := range msgs {
reply := msg.ReplySubject
Expand All @@ -311,78 +302,134 @@ func (s *Server) uploadMsgsToTier2Storage() {
}
}

func (s *Server) sendPeriodicJsApiFetchTieredStorageMsgs() {
ticker := time.NewTicker(2 * time.Second)
for range ticker.C {
if TIERED_STORAGE_CONSUMER_CREATED && TIERED_STORAGE_STREAM_CREATED {
durableName := TIERED_STORAGE_CONSUMER
subject := fmt.Sprintf(JSApiRequestNextT, tieredStorageStream, durableName)
reply := durableName + "_reply"
amount := 1000
req := []byte(strconv.FormatUint(uint64(amount), 10))
serv.sendInternalAccountMsgWithReply(serv.GlobalAccount(), subject, reply, nil, req, true)
func (s *Server) ConsumeUnackedMsgs() {
type unAckedMsg struct {
Msg []byte
ReplySubject string
}
amount := 1000
req := []byte(strconv.FormatUint(uint64(amount), 10))
for {
if DLS_UNACKED_CONSUMER_CREATED && DLS_UNACKED_STREAM_CREATED {
resp := make(chan unAckedMsg)
replySubj := DLS_UNACKED_CONSUMER + "_reply_" + s.memphis.nuid.Next()

// subscribe to unacked messages
sub, err := s.subscribeOnGlobalAcc(replySubj, replySubj+"_sid", func(_ *client, subject, reply string, msg []byte) {
go func(subject, reply string, msg []byte) {
// Ignore 409 Exceeded MaxWaiting cases
if reply != "" {
message := unAckedMsg{
Msg: msg,
ReplySubject: reply,
}
resp <- message
}
}(subject, reply, copyBytes(msg))
})
if err != nil {
s.Errorf("Failed to subscribe to unacked messages: " + err.Error())
continue
}

// send JS API request to get more messages
subject := fmt.Sprintf(JSApiRequestNextT, dlsUnackedStream, DLS_UNACKED_CONSUMER)
s.sendInternalAccountMsgWithReply(s.GlobalAccount(), subject, replySubj, nil, req, true)

timeout := time.NewTimer(5 * time.Second)
msgs := make([]unAckedMsg, 0)
stop := false
for {
if stop {
s.unsubscribeOnAcc(s.GlobalAccount(), sub)
break
}
select {
case unAckedMsg := <-resp:
msgs = append(msgs, unAckedMsg)
if len(msgs) == amount {
stop = true
}
case <-timeout.C:
stop = true
}
}
for _, msg := range msgs {
err := s.handleNewUnackedMsg(msg.Msg)
if err == nil {
// send ack
s.sendInternalAccountMsg(s.GlobalAccount(), msg.ReplySubject, []byte(_EMPTY_))
}
}
} else {
time.Sleep(2 * time.Second)
}
}
}

func (s *Server) ListenForTieredStorageMessages() error {
tieredStorageMsgsMap = NewConcurrentMap[[]StoredMsg]()
func (s *Server) ConsumeTieredStorageMsgs() {
type tsMsg struct {
Msg []byte
ReplySubject string
}

subject := TIERED_STORAGE_CONSUMER + "_reply"
err := serv.queueSubscribe(subject, subject+"_sid", func(_ *client, subject, reply string, msg []byte) {
go func(subject, reply string, msg []byte) {
//Ignore 409 Exceeded MaxWaiting cases
if reply != "" {
rawMsg := strings.Split(string(msg), CR_LF+CR_LF)
var tieredStorageMsg TieredStorageMsg
if len(rawMsg) == 2 {
err := json.Unmarshal([]byte(rawMsg[1]), &tieredStorageMsg)
if err != nil {
serv.Errorf("ListenForTieredStorageMessages: Failed unmarshalling tiered storage message: " + err.Error())
return
tieredStorageMsgsMap = NewConcurrentMap[[]StoredMsg]()
amount := 1000
req := []byte(strconv.FormatUint(uint64(amount), 10))
for {
if TIERED_STORAGE_CONSUMER_CREATED && TIERED_STORAGE_STREAM_CREATED {
resp := make(chan tsMsg)
replySubj := TIERED_STORAGE_CONSUMER + "_reply_" + s.memphis.nuid.Next()

// subscribe to unacked messages
sub, err := s.subscribeOnGlobalAcc(replySubj, replySubj+"_sid", func(_ *client, subject, reply string, msg []byte) {
go func(subject, reply string, msg []byte) {
// Ignore 409 Exceeded MaxWaiting cases
if reply != "" {
message := tsMsg{
Msg: msg,
ReplySubject: reply,
}
resp <- message
}
} else {
serv.Errorf("ListenForTieredStorageMessages: Invalid tiered storage message structure: message must contains msg-id header")
return
}
payload := tieredStorageMsg.Buf
replySubj := reply
rawTs := tokenAt(reply, 8)
seq, _, _ := ackReplyInfo(reply)
intTs, err := strconv.Atoi(rawTs)
if err != nil {
serv.Errorf("ListenForTieredStorageMessages: Failed convert rawTs from string to int")
return
}
}(subject, reply, copyBytes(msg))
})
if err != nil {
s.Errorf("Failed to subscribe to tiered storage messages: " + err.Error())
continue
}

dataFirstIdx := 0
dataFirstIdx = getHdrLastIdxFromRaw(payload) + 1
if dataFirstIdx > len(payload)-len(CR_LF) {
s.Errorf("memphis error parsing")
return
// send JS API request to get more messages
subject := fmt.Sprintf(JSApiRequestNextT, tieredStorageStream, TIERED_STORAGE_CONSUMER)
s.sendInternalAccountMsgWithReply(s.GlobalAccount(), subject, replySubj, nil, req, true)

timeout := time.NewTimer(5 * time.Second)
msgs := make([]tsMsg, 0)
stop := false
for {
if stop {
s.unsubscribeOnAcc(s.GlobalAccount(), sub)
break
}
dataLen := len(payload) - dataFirstIdx
header := payload[:dataFirstIdx]
data := payload[dataFirstIdx : dataFirstIdx+dataLen]
message := StoredMsg{
Subject: tieredStorageMsg.StationName,
Sequence: uint64(seq),
Data: data,
Header: header,
Time: time.Unix(0, int64(intTs)),
ReplySubject: replySubj,
select {
case tieredStorageMsg := <-resp:
msgs = append(msgs, tieredStorageMsg)
if len(msgs) == amount {
stop = true
}
case <-timeout.C:
stop = true
}

s.storeInTieredStorageMap(message)
}
}(subject, reply, copyBytes(msg))
})
if err != nil {
serv.Errorf("ListenForTieredStorageMessages: Failed queueSubscribe tiered storage: " + err.Error())
return err
for _, message := range msgs {
msg := message.Msg
reply := message.ReplySubject
s.handleNewTieredStorageMsg(msg, reply)
}
} else {
time.Sleep(2 * time.Second)
}
}

return nil
}

func (s *Server) ListenForSchemaverseDlsEvents() error {
Expand Down Expand Up @@ -432,15 +479,13 @@ func (s *Server) ListenForSchemaverseDlsEvents() error {
return nil
}

func (s *Server) RemoveOldDlsMsgs() error {
func (s *Server) RemoveOldDlsMsgs() {
ticker := time.NewTicker(2 * time.Minute)
for range ticker.C {
configurationTime := time.Now().Add(time.Hour * time.Duration(-s.opts.DlsRetentionHours))
err := db.DeleteOldDlsMessageByRetention(configurationTime)
if err != nil {
serv.Errorf("RemoveOldDlsMsgs: " + err.Error())
return err
}
}
return nil
}
5 changes: 5 additions & 0 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2943,6 +2943,11 @@ func (o *consumer) incDeliveryCount(sseq uint64) uint64 {

// send a delivery exceeded advisory.
func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) {
// *** added by memphis
if strings.HasPrefix(o.stream, "$memphis") { // skipping memphis streams max deliveries events
return
}
// added by memphis ***
e := JSConsumerDeliveryExceededAdvisory{
TypedEvent: TypedEvent{
Type: JSConsumerDeliveryExceededAdvisoryType,
Expand Down
Loading