Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update-logs-resend-all-unacked-msgs #1181

Merged
merged 2 commits into from
Jul 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions server/memphis_handlers_async_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"fmt"
"memphis/db"
"memphis/memphis_cache"
"strconv"
"time"
)

Expand All @@ -36,9 +35,8 @@ func (s *Server) CompleteRelevantStuckAsyncTasks() {
return
}
if !exist {
stationIdStr := strconv.Itoa(asyncTask.StationId)
errMsg := fmt.Sprintf("Station %v does not exist", stationIdStr)
serv.Warnf("[tenant: %v][user: %v]CompleteRelevantStuckAsyncTasks at GetStationById: %s", asyncTask.TenantName, errMsg)
errMsg := fmt.Sprintf("Station %v does not exist", station.Name)
serv.Warnf("[tenant: %v][user: %v]CompleteRelevantStuckAsyncTasks at GetStationById: %v", asyncTask.TenantName, station.CreatedByUsername, errMsg)
continue
}

Expand Down
46 changes: 22 additions & 24 deletions server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2178,18 +2178,17 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName
var minId int
var maxId int
username := user.Username
stationIdStr := strconv.Itoa(stationId)

err := db.UpdateResendDisabledInStations(true, []int{stationId})
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations: %v", tenantName, username, stationIdStr, err.Error())
s.handleResendAllFailure(user, stationId, tenantName)
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure(user, stationId, tenantName, stationName)
return
}
task, err := db.UpsertAsyncTask("resend_all_dls_msgs", s.opts.ServerName, createdAt, tenantName, stationId)
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]ResendAllDlsMsgs at UpsertAsyncTask : %v", tenantName, username, stationIdStr, err.Error())
s.handleResendAllFailure(user, stationId, tenantName)
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpsertAsyncTask at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure(user, stationId, tenantName, stationName)
return
}

Expand All @@ -2200,15 +2199,15 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName
minId = int(data)
_, maxId, err = db.GetMinMaxIdsOfDlsMsgsByUpdatedAt(tenantName, createdAt, stationId)
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt: %v", tenantName, username, stationIdStr, err.Error())
s.handleResendAllFailure(user, stationId, tenantName)
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure(user, stationId, tenantName, stationName)
return
}
} else {
minId, maxId, err = db.GetMinMaxIdsOfDlsMsgsByUpdatedAt(tenantName, createdAt, stationId)
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt: %v", tenantName, username, stationIdStr, err.Error())
s.handleResendAllFailure(user, stationId, tenantName)
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure(user, stationId, tenantName, stationName)
return
}
// -1 in order to prevent skipping the first element
Expand All @@ -2218,8 +2217,8 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName
for {
_, dlsMsgs, err := db.GetDlsMsgsBatch(tenantName, minId, maxId, stationId)
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]ResendAllDlsMsgs at GetDlsMsgsBatch: %v", tenantName, username, stationIdStr, err.Error())
s.handleResendAllFailure(user, stationId, tenantName)
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetDlsMsgsBatch at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure(user, stationId, tenantName, stationName)
return
}

Expand All @@ -2231,40 +2230,40 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName
}
_, err = serv.ResendUnackedMsg(dlsMsg, user, stationName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]ResendAllDlsMsgs at ResendUnackedMsg: %v", tenantName, username, stationIdStr, err.Error())
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at ResendUnackedMsg at station %v : %v", tenantName, username, stationName, err.Error())
continue
}

}
err = db.UpdateAsyncTask(task.Name, tenantName, time.Now(), data, stationId)
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]ResendAllDlsMsgs at UpdateAsyncTask: %v", tenantName, username, stationIdStr, err.Error())
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error())
continue
}
minId = offset
if len(dlsMsgs) == 0 || offset == maxId {
err = db.RemoveAsyncTask(task.Name, tenantName, stationId)
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]ResendAllDlsMsgs at RemoveAsyncTask: %v", tenantName, username, stationIdStr, err.Error())
s.handleResendAllFailure(user, stationId, tenantName)
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at RemoveAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error())
s.handleResendAllFailure(user, stationId, tenantName, stationName)
return
}

err = db.UpdateResendDisabledInStations(false, []int{stationId})
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations: %v", tenantName, username, stationIdStr, err.Error())
s.handleResendAllFailure(user, stationId, tenantName)
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure(user, stationId, tenantName, stationName)
return
}

systemMessage := SystemMessage{
MessageType: "info",
MessagePayload: fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has been completed successfully", stationIdStr, username),
MessagePayload: fmt.Sprintf("Resend all unacked messages operation at station %s, triggered by user %s has been completed successfully", stationName, username),
}

err = serv.sendSystemMessageOnWS(user, systemMessage)
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]ResendAllDlsMsgs at sendSystemMessageOnWS: %v", tenantName, username, stationIdStr, err.Error())
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at sendSystemMessageOnWS at station %v : %v", tenantName, username, stationName, err.Error())
return
}
break
Expand All @@ -2273,20 +2272,19 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName
}()
}

func (s *Server) handleResendAllFailure(user models.User, stationId int, tenantName string) {
stationIdStr := strconv.Itoa(stationId)
func (s *Server) handleResendAllFailure(user models.User, stationId int, tenantName, stationName string) {
systemMessage := SystemMessage{
MessageType: "Error",
MessagePayload: fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has failed due to an internal error:", stationIdStr, user.Username),
MessagePayload: fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has failed due to an internal error:", stationName, user.Username),
}
err := serv.sendSystemMessageOnWS(user, systemMessage)
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]handleResendAllFailure at sendSystemMessageOnWS: %v", tenantName, user.Username, stationIdStr, err.Error())
serv.Errorf("[tenant: %v][user: %v]handleResendAllFailure at sendSystemMessageOnWS at station %v : %v", tenantName, user.Username, stationName, err.Error())
return
}
err = db.UpdateResendDisabledInStations(false, []int{stationId})
if err != nil {
serv.Errorf("[tenant: %v][user: %v][station: %v]handleResendAllFailure at UpdateResendDisabledInStations: %v", tenantName, user.Username, stationIdStr, err.Error())
serv.Errorf("[tenant: %v][user: %v]handleResendAllFailure at UpdateResendDisabledInStations at station %v : %v", tenantName, user.Username, stationName, err.Error())
return
}
}