Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
shohamroditimemphis committed Jan 10, 2024
1 parent 2e97b43 commit bfc2870
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 91 deletions.
81 changes: 33 additions & 48 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
tenant_name VARCHAR NOT NULL DEFAULT '$memphis',
station_id INT NOT NULL,
created_by VARCHAR NOT NULL,
status VARCHAR NOT NULL,
status VARCHAR NOT NULL DEFAULT 'running',
failure_reason VARCHAR NOT NULL DEFAULT '',
PRIMARY KEY (id)
);`
Expand All @@ -605,7 +605,7 @@ func createTables(MetadataDbClient MetadataStorage) error {
SELECT 1 FROM information_schema.tables WHERE table_name = 'async_tasks' AND table_schema = 'public'
) THEN
ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS created_by VARCHAR NOT NULL;
ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS status VARCHAR NOT NULL;
ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS status VARCHAR NOT NULL DEFAULT 'running';
ALTER TABLE async_tasks ADD COLUMN IF NOT EXISTS failure_reason VARCHAR NOT NULL DEFAULT '';
END IF;
IF EXISTS (
Expand Down Expand Up @@ -7268,7 +7268,7 @@ func UpsertAsyncTask(task, brokerInCharge string, createdAt time.Time, tenantNam
&asyncTask.StationId,
&asyncTask.CreatedBy,
&asyncTask.Status,
&asyncTask.InvalidReason,
&asyncTask.FailureReason,
)

if err != nil {
Expand Down Expand Up @@ -7376,7 +7376,7 @@ func GetAsyncTaskByNameAndBrokerName(task, brokerName string) (bool, []models.As
return true, asyncTask, nil
}

func GetActiveAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) {
func GetActiveAndUpdatedAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()

Expand All @@ -7390,28 +7390,35 @@ func GetActiveAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) {
a.name,
a.created_at,
a.created_by,
s.name,
s.name AS station_name,
a.meta_data,
a.status,
a.failure_reason
FROM
async_tasks AS a
LEFT JOIN stations AS s ON a.station_id = s.id
async_tasks AS a
LEFT JOIN
stations AS s ON a.station_id = s.id
WHERE
a.tenant_name = $1
AND (a.status = 'running' OR a.id IN (
SELECT
a.id
FROM
async_tasks
WHERE
a.tenant_name = $1
ORDER BY
a.updated_at DESC
LIMIT 10
));
`
stmt, err := conn.Conn().Prepare(ctx, "get_active_async_tasks", query)
SELECT
id
FROM
async_tasks
WHERE
tenant_name = $1
AND status <> 'running'
ORDER BY
created_at DESC
LIMIT 10
))
ORDER BY
CASE
WHEN a.status = 'running' THEN 0
ELSE 1
END,
created_at DESC;`
stmt, err := conn.Conn().Prepare(ctx, "get_active_and_updated_async_tasks", query)
if err != nil {
return []models.AsyncTaskRes{}, err
}
Expand All @@ -7435,7 +7442,7 @@ func GetActiveAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) {
&sName,
&task.Data,
&task.Status,
&task.InvalidReason,
&task.FailureReason,
)
if err != nil {
return []models.AsyncTaskRes{}, err
Expand Down Expand Up @@ -7482,30 +7489,7 @@ func UpdateAsyncTask(task, tenantName string, updatedAt time.Time, metaData inte
return nil
}

func UpdateStatusAsyncTask(task, tenantName, status string, stationId int) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
query := `UPDATE async_tasks SET status = $1 WHERE name = $3 AND tenant_name=$4 AND station_id = $5`
stmt, err := conn.Conn().Prepare(ctx, "edit_status_async_task_by_task_and_tenant_name_and_station_id", query)
if err != nil {
return err
}
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
_, err = conn.Conn().Query(ctx, stmt.Name, status, task, tenantName, stationId)
if err != nil {
return err
}
return nil
}

func UpdateReasonAsyncTask(task, tenantName, status string, stationId int, reason, functionName string) error {
func UpdateStatusAsyncTask(task, tenantName, status string, stationId int, failureReason, functionName string) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
Expand All @@ -7529,12 +7513,12 @@ func UpdateReasonAsyncTask(task, tenantName, status string, stationId int, reaso
}
tenantName = strings.ToLower(tenantName)
if stationId == -1 {
_, err = conn.Conn().Query(ctx, stmt.Name, status, reason, task, tenantName, stationId, functionName)
_, err = conn.Conn().Query(ctx, stmt.Name, status, failureReason, task, tenantName, stationId, functionName)
if err != nil {
return err
}
} else {
_, err = conn.Conn().Query(ctx, stmt.Name, status, reason, task, tenantName, stationId)
_, err = conn.Conn().Query(ctx, stmt.Name, status, failureReason, task, tenantName, stationId)
if err != nil {
return err
}
Expand All @@ -7543,7 +7527,7 @@ func UpdateReasonAsyncTask(task, tenantName, status string, stationId int, reaso
return nil
}

func RemoveAsyncTasks() error {
func RemoveOldAsyncTasks() error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
Expand All @@ -7555,10 +7539,11 @@ func RemoveAsyncTasks() error {
WHERE status != 'running' AND id NOT IN (
SELECT id
FROM async_tasks
WHERE status != 'running'
ORDER BY updated_at DESC
LIMIT 10
);`
stmt, err := conn.Conn().Prepare(ctx, "remove_async_task", query)
stmt, err := conn.Conn().Prepare(ctx, "remove_old_async_task", query)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions models/async_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type AsyncTask struct {
StationId int `json:"station_id"`
CreatedBy string `json:"created_by"`
Status string `json:"status"`
InvalidReason string `json:"invalid_reason"`
FailureReason string `json:"failure_reason"`
}

type AsyncTaskRes struct {
Expand All @@ -24,7 +24,7 @@ type AsyncTaskRes struct {
StationName string `json:"station_name"`
Data interface{} `json:"data"`
Status string `json:"status"`
InvalidReason string `json:"invalid_reason"`
FailureReason string `json:"failure_reason"`
}

type MetaData struct {
Expand Down
8 changes: 4 additions & 4 deletions server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (s *Server) StartBackgroundTasks() error {
go s.ConsumeFunctionTasks()
go s.ScaleFunctionWorkers()
go s.ConnectorsDeadPodsRescheduler()
go s.RemoveOldAndCompletedAsyncTasks()
go s.removeOldAsyncTasks()

return nil
}
Expand Down Expand Up @@ -855,12 +855,12 @@ func (s *Server) ReleaseStuckLocks() {
}
}

func (s *Server) RemoveOldAndCompletedAsyncTasks() {
func (s *Server) removeOldAsyncTasks() {
ticker := time.NewTicker(15 * time.Minute)
for range ticker.C {
err := db.RemoveAsyncTasks()
err := db.RemoveOldAsyncTasks()
if err != nil {
serv.Errorf("RemoveOldAndCompletedAsyncTasks at RemoveAsyncTasks : %v", err.Error())
serv.Errorf("RemoveOldAsyncTasks at db.RemoveOldAsyncTasks : %v", err.Error())
}
}
}
14 changes: 7 additions & 7 deletions server/memphis_handlers_async_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type AsyncTasksHandler struct{}
func (s *Server) CompleteRelevantStuckAsyncTasks() {
exist, asyncTasks, err := db.GetAsyncTaskByNameAndBrokerName("resend_all_dls_msgs", s.opts.ServerName)
if err != nil {
serv.Errorf("CompleteRelevantStuckAsyncTasks: failed to get async tasks resend_all_dls_msgs: %v", err.Error())
s.Errorf("CompleteRelevantStuckAsyncTasks: failed to get async tasks resend_all_dls_msgs: %v", err.Error())
return
}
if !exist {
Expand All @@ -38,22 +38,22 @@ func (s *Server) CompleteRelevantStuckAsyncTasks() {
for _, asyncTask := range asyncTasks {
exist, station, err := db.GetStationById(asyncTask.StationId, asyncTask.TenantName)
if err != nil {
serv.Errorf("[tenant: %v]CompleteRelevantStuckAsyncTasks at GetStationById: %v", asyncTask.TenantName, err.Error())
s.Errorf("[tenant: %v]CompleteRelevantStuckAsyncTasks at GetStationById: %v", asyncTask.TenantName, err.Error())
return
}
if !exist {
errMsg := fmt.Sprintf("Station %v does not exist", station.Name)
serv.Warnf("[tenant: %v][user: %v]CompleteRelevantStuckAsyncTasks at GetStationById: %v", asyncTask.TenantName, station.CreatedByUsername, errMsg)
s.Warnf("[tenant: %v][user: %v]CompleteRelevantStuckAsyncTasks at GetStationById: %v", asyncTask.TenantName, station.CreatedByUsername, errMsg)
continue
}

exist, user, err := memphis_cache.GetUser(station.CreatedByUsername, asyncTask.TenantName, false)
if err != nil {
serv.Errorf("[tenant:%v][user: %v] CompleteRelevantStuckAsyncTasks could not retrive user model from cache or db error: %v", asyncTask.TenantName, user.Username, err)
s.Errorf("[tenant:%v][user: %v] CompleteRelevantStuckAsyncTasks could not retrive user model from cache or db error: %v", asyncTask.TenantName, user.Username, err)
continue
}
if !exist {
serv.Warnf("[tenant:%v][user: %v] CompleteRelevantStuckAsyncTasks user does not exist", asyncTask.TenantName, user.Username)
s.Warnf("[tenant:%v][user: %v] CompleteRelevantStuckAsyncTasks user does not exist", asyncTask.TenantName, user.Username)
continue
}

Expand Down Expand Up @@ -103,9 +103,9 @@ func (ash AsyncTasksHandler) GetAsyncTasks(c *gin.Context) {
}

func (ash AsyncTasksHandler) GetAllAsyncTasks(tenantName string) ([]models.AsyncTaskRes, error) {
asyncTasks, err := db.GetActiveAsyncTasks(tenantName)
asyncTasks, err := db.GetActiveAndUpdatedAsyncTasks(tenantName)
if err != nil {
serv.Errorf("GetAllAsyncTasks at GetActiveAsyncTasks: %v", err.Error())
serv.Errorf("GetAllAsyncTasks at GetActiveAndUpdatedAsyncTasks: %v", err.Error())
return []models.AsyncTaskRes{}, err
}
for i, task := range asyncTasks {
Expand Down
52 changes: 26 additions & 26 deletions server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2626,14 +2626,14 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName

err := db.UpdateResendDisabledInStations(true, []int{stationId})
if err != nil {
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName)
s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error())
return
}
task, err := db.UpsertAsyncTask("resend_all_dls_msgs", s.opts.ServerName, createdAt, tenantName, stationId, user.Username)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpsertAsyncTask at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName)
s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpsertAsyncTask at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error())
return
}

Expand All @@ -2644,15 +2644,15 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName
minId = int(data)
_, maxId, err = db.GetMinMaxIdsOfDlsMsgsByUpdatedAt(tenantName, createdAt, stationId)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName)
s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error())
return
}
} else {
minId, maxId, err = db.GetMinMaxIdsOfDlsMsgsByUpdatedAt(tenantName, createdAt, stationId)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName)
s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetMinMaxIdsOfDlsMsgsByUpdatedAt at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error())
return
}
// -1 in order to prevent skipping the first element
Expand All @@ -2662,8 +2662,8 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName
for {
_, dlsMsgs, err := db.GetDlsMsgsBatch(tenantName, minId, maxId, stationId)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetDlsMsgsBatch at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName)
s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at GetDlsMsgsBatch at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error())
return
}

Expand All @@ -2673,31 +2673,31 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName
data = models.MetaData{
Offset: offset,
}
_, err = serv.ResendUnackedMsg(dlsMsg, user, stationName)
_, err = s.ResendUnackedMsg(dlsMsg, user, stationName)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at ResendUnackedMsg at station %v : %v", tenantName, username, stationName, err.Error())
s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at ResendUnackedMsg at station %v : %v", tenantName, username, stationName, err.Error())
continue
}

}
err = db.UpdateAsyncTask(task.Name, tenantName, time.Now(), data, stationId)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error())
s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error())
continue
}
minId = offset
if len(dlsMsgs) == 0 || offset == maxId {
err = db.UpdateStatusAsyncTask(task.Name, tenantName, "completed", stationId)
err = db.UpdateStatusAsyncTask(task.Name, tenantName, "completed", stationId, "", "")
if err != nil {
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateStatusAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName)
s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateStatusAsyncTask at station %v : %v ", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error())
return
}

err = db.UpdateResendDisabledInStations(false, []int{stationId})
if err != nil {
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName)
s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at UpdateResendDisabledInStations at station %v : %v", tenantName, username, stationName, err.Error())
s.handleResendAllFailure("resend_all_dls_msgs", user, stationId, tenantName, stationName, err.Error())
return
}

Expand All @@ -2706,9 +2706,9 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName
MessagePayload: fmt.Sprintf("Resend all unacked messages operation at station %s, triggered by user %s has been completed successfully", stationName, username),
}

err = serv.sendSystemMessageOnWS(user, systemMessage)
err = s.sendSystemMessageOnWS(user, systemMessage)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at sendSystemMessageOnWS at station %v : %v", tenantName, username, stationName, err.Error())
s.Errorf("[tenant: %v][user: %v]ResendAllDlsMsgs at sendSystemMessageOnWS at station %v : %v", tenantName, username, stationName, err.Error())
return
}
break
Expand All @@ -2717,26 +2717,26 @@ func (s *Server) ResendAllDlsMsgs(stationName string, stationId int, tenantName
}()
}

func (s *Server) handleResendAllFailure(taskName string, user models.User, stationId int, tenantName, stationName string) {
msgErr := fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has failed due to an internal error:", stationName, user.Username)
func (s *Server) handleResendAllFailure(taskName string, user models.User, stationId int, tenantName, stationName string, errMsg string) {
msgErr := fmt.Sprintf("Resend all unacked messages operation in station %s, triggered by user %s has failed due to an internal error", stationName, user.Username)
systemMessage := SystemMessage{
MessageType: "error",
MessagePayload: msgErr,
}
err := serv.sendSystemMessageOnWS(user, systemMessage)
err := s.sendSystemMessageOnWS(user, systemMessage)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]handleResendAllFailure at sendSystemMessageOnWS at station %v : %v", tenantName, user.Username, stationName, err.Error())
return
}
err = db.UpdateResendDisabledInStations(false, []int{stationId})
if err != nil {
serv.Errorf("[tenant: %v][user: %v]handleResendAllFailure at UpdateResendDisabledInStations at station %v : %v", tenantName, user.Username, stationName, err.Error())
s.Errorf("[tenant: %v][user: %v]handleResendAllFailure at UpdateResendDisabledInStations at station %v : %v", tenantName, user.Username, stationName, err.Error())
return
}

err = db.UpdateReasonAsyncTask(taskName, tenantName, "failed", stationId, msgErr, "")
err = db.UpdateStatusAsyncTask(taskName, tenantName, "failed", stationId, errMsg, "")
if err != nil {
serv.Errorf("[tenant: %v][user: %v]handleResendAllFailure at UpdateReasonAsyncTask at station %v : %v", tenantName, user.Username, stationName, err.Error())
s.Errorf("[tenant: %v][user: %v]handleResendAllFailure at UpdateReasonAsyncTask at station %v : %v", tenantName, user.Username, stationName, err.Error())
return
}
}
Loading

0 comments on commit bfc2870

Please sign in to comment.