diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 00978d7846..55c8f1b7f6 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -3449,6 +3449,7 @@ func (jd *HandleT) deleteJobStatusDSInTx(txHandler transactionHandler, ds dataSe if len(customValFilters) > 0 { sqlFilters = append(sqlFilters, constructQueryOR(fmt.Sprintf(`%q.custom_val`, ds.JobTable), customValFilters)) } + if len(parameterFilters) > 0 { sqlFilters = append(sqlFilters, constructParameterJSONQuery(ds.JobTable, parameterFilters)) } diff --git a/jobsdb/readonly_jobsdb.go b/jobsdb/readonly_jobsdb.go index ab0b4117ea..11ed69849a 100644 --- a/jobsdb/readonly_jobsdb.go +++ b/jobsdb/readonly_jobsdb.go @@ -219,7 +219,7 @@ func (jd *ReadonlyHandleT) getUnprocessedJobsDSCount(ctx context.Context, ds dat var sqlStatement string - sqlStatement = fmt.Sprintf(`LOCK TABLE %s IN ACCESS SHARE MODE;`, ds.JobStatusTable) + sqlStatement = fmt.Sprintf(`LOCK TABLE %q IN ACCESS SHARE MODE;`, ds.JobStatusTable) err = jd.prepareAndExecStmtInTxn(txn, sqlStatement) if err != nil { if rollbackErr := txn.Rollback(); rollbackErr != nil { @@ -229,7 +229,7 @@ func (jd *ReadonlyHandleT) getUnprocessedJobsDSCount(ctx context.Context, ds dat return 0, err } - sqlStatement = fmt.Sprintf(`LOCK TABLE %s IN ACCESS SHARE MODE;`, ds.JobTable) + sqlStatement = fmt.Sprintf(`LOCK TABLE %q IN ACCESS SHARE MODE;`, ds.JobTable) err = jd.prepareAndExecStmtInTxn(txn, sqlStatement) if err != nil { if rollbackErr := txn.Rollback(); rollbackErr != nil { @@ -245,15 +245,15 @@ func (jd *ReadonlyHandleT) getUnprocessedJobsDSCount(ctx context.Context, ds dat } else { selectColumn = "COUNT(*)" } - sqlStatement = fmt.Sprintf(`SELECT %[3]s FROM %[1]s LEFT JOIN %[2]s ON %[1]s.job_id=%[2]s.job_id - WHERE %[2]s.job_id is NULL`, ds.JobTable, ds.JobStatusTable, selectColumn) + sqlStatement = fmt.Sprintf(`SELECT %[3]s FROM %[1]q AS jobs LEFT JOIN %[2]q AS job_status ON jobs.job_id=job_status.job_id + WHERE job_status.job_id is NULL`, ds.JobTable, ds.JobStatusTable, selectColumn) if len(customValFilters) > 0 { - sqlStatement += " AND " + constructQueryOR(fmt.Sprintf("%s.custom_val", ds.JobTable), customValFilters) + sqlStatement += " AND " + constructQueryOR("jobs.custom_val", customValFilters) } if len(parameterFilters) > 0 { - sqlStatement += " AND " + constructParameterJSONQuery(ds.JobTable, parameterFilters) + sqlStatement += " AND " + constructParameterJSONQuery("jobs", parameterFilters) } if jd.tablePrefix == "gw" { @@ -352,13 +352,13 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS } if len(customValFilters) > 0 { customValQuery = " AND " + - constructQueryOR(fmt.Sprintf("%s.custom_val", ds.JobTable), customValFilters) + constructQueryOR("jobs.custom_val", customValFilters) } else { customValQuery = "" } if len(parameterFilters) > 0 { - sourceQuery += " AND " + constructParameterJSONQuery(ds.JobTable, parameterFilters) + sourceQuery += " AND " + constructParameterJSONQuery("jobs", parameterFilters) } else { sourceQuery = "" } @@ -371,7 +371,7 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS var sqlStatement string - sqlStatement = fmt.Sprintf(`LOCK TABLE %s IN ACCESS SHARE MODE;`, ds.JobStatusTable) + sqlStatement = fmt.Sprintf(`LOCK TABLE %q IN ACCESS SHARE MODE;`, ds.JobStatusTable) err = jd.prepareAndExecStmtInTxn(txn, sqlStatement) if err != nil { if rollbackErr := txn.Rollback(); rollbackErr != nil { @@ -381,7 +381,7 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS return 0, err } - sqlStatement = fmt.Sprintf(`LOCK TABLE %s IN ACCESS SHARE MODE;`, ds.JobTable) + sqlStatement = fmt.Sprintf(`LOCK TABLE %q IN ACCESS SHARE MODE;`, ds.JobTable) err = jd.prepareAndExecStmtInTxn(txn, sqlStatement) if err != nil { if rollbackErr := txn.Rollback(); rollbackErr != nil { @@ -393,13 +393,13 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS var selectColumn string if jd.tablePrefix == "gw" { - selectColumn = fmt.Sprintf("%[1]s.event_payload->'batch' as batch", ds.JobTable) + selectColumn = "jobs.event_payload->'batch' as batch" } else { - selectColumn = fmt.Sprintf("COUNT(%[1]s.job_id)", ds.JobTable) + selectColumn = "COUNT(jobs.job_id)" } sqlStatement = fmt.Sprintf(`SELECT %[6]s FROM - %[1]s - JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id + %[1]q AS jobs + JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id %[3]s %[4]s %[5]s AND job_latest_state.retry_time < $1`, @@ -504,12 +504,12 @@ func (jd *ReadonlyHandleT) GetJobSummaryCount(arg, prefix string) (string, error var dsString string for _, dsPair := range dsListArr { sqlStatement := fmt.Sprintf(`SELECT COUNT(*), - %[1]s.parameters->'source_id' as source, - %[1]s.custom_val ,%[1]s.parameters->'destination_id' as destination, + jobs.parameters->'source_id' as source, + jobs.custom_val, jobs.parameters->'destination_id' as destination, job_latest_state.job_state - FROM %[1]s - LEFT JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id - GROUP BY job_latest_state.job_state, %[1]s.parameters->'source_id', %[1]s.parameters->'destination_id', %[1]s.custom_val;`, dsPair.JobTableName, dsPair.JobStatusTableName) + FROM %[1]q AS jobs + LEFT JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id + GROUP BY job_latest_state.job_state, jobs.parameters->'source_id', jobs.parameters->'destination_id', jobs.custom_val;`, dsPair.JobTableName, dsPair.JobStatusTableName) row, err := jd.DbHandle.Query(sqlStatement) if err != nil { return "", err @@ -559,17 +559,17 @@ func (jd *ReadonlyHandleT) GetLatestFailedJobs(arg, prefix string) (string, erro dsListTotal := jd.getDSList() dsList = DSPair{JobTableName: dsListTotal[0].JobTable, JobStatusTableName: dsListTotal[0].JobStatusTable} } - sqlStatement := fmt.Sprintf(`SELECT %[1]s.job_id, %[1]s.user_id, %[1]s.custom_val, + sqlStatement := fmt.Sprintf(`SELECT jobs.job_id, jobs.user_id, jobs.custom_val, job_latest_state.exec_time, job_latest_state.error_code, job_latest_state.error_response - FROM %[1]s - JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id + FROM %[1]q AS jobs + JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id WHERE job_latest_state.job_state = 'failed' `, dsList.JobTableName, dsList.JobStatusTableName) if argList[1] != "" { - sqlStatement = sqlStatement + fmt.Sprintf(`AND %[1]s.custom_val = '%[2]s'`, dsList.JobTableName, argList[1]) + sqlStatement = sqlStatement + fmt.Sprintf(`AND jobs.custom_val = '%[1]s'`, argList[1]) } - sqlStatement = sqlStatement + fmt.Sprintf(`ORDER BY %[1]s.job_id desc LIMIT 5;`, dsList.JobTableName) + sqlStatement = sqlStatement + `ORDER BY jobs.job_id desc LIMIT 5;` row, err := jd.DbHandle.Query(sqlStatement) if err != nil { return "", err @@ -601,7 +601,7 @@ func (jd *ReadonlyHandleT) GetJobByID(job_id, _ string) (string, error) { var response []byte for _, dsPair := range dsListTotal { var min, max sql.NullInt32 - sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %s`, dsPair.JobTable) + sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %q`, dsPair.JobTable) row := jd.DbHandle.QueryRow(sqlStatement) err := row.Scan(&min, &max) if err != nil { @@ -618,15 +618,15 @@ func (jd *ReadonlyHandleT) GetJobByID(job_id, _ string) (string, error) { continue } sqlStatement = fmt.Sprintf(`SELECT - %[1]s.job_id, %[1]s.uuid, %[1]s.user_id, %[1]s.parameters, %[1]s.custom_val, %[1]s.event_payload, - %[1]s.created_at, %[1]s.expire_at, + jobs.job_id, jobs.uuid, jobs.user_id, jobs.parameters, jobs.custom_val, jobs.event_payload, + jobs.created_at, jobs.expire_at, job_latest_state.job_state, job_latest_state.attempt, job_latest_state.exec_time, job_latest_state.retry_time, job_latest_state.error_code, job_latest_state.error_response FROM - %[1]s - LEFT JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id - WHERE %[1]s.job_id = %[3]s;`, dsPair.JobTable, dsPair.JobStatusTable, job_id) + %[1]q AS jobs + LEFT JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id + WHERE jobs.job_id = %[3]s;`, dsPair.JobTable, dsPair.JobStatusTable, job_id) event := JobT{} row = jd.DbHandle.QueryRow(sqlStatement) @@ -636,11 +636,11 @@ func (jd *ReadonlyHandleT) GetJobByID(job_id, _ string) (string, error) { &event.LastJobStatus.ErrorResponse) if err != nil { sqlStatement = fmt.Sprintf(`SELECT - %[1]s.job_id, %[1]s.uuid, %[1]s.user_id, %[1]s.parameters, %[1]s.custom_val, %[1]s.event_payload, - %[1]s.created_at, %[1]s.expire_at + jobs.job_id, jobs.uuid, jobs.user_id, jobs.parameters, jobs.custom_val, jobs.event_payload, + jobs.created_at, jobs.expire_at FROM - %[1]s - WHERE %[1]s.job_id = %[2]s;`, dsPair.JobTable, job_id) + %[1]q AS jobs + WHERE jobs.job_id = %[2]s;`, dsPair.JobTable, job_id) row = jd.DbHandle.QueryRow(sqlStatement) err1 := row.Scan(&event.JobID, &event.UUID, &event.UserID, &event.Parameters, &event.CustomVal, &event.EventPayload, &event.CreatedAt, &event.ExpireAt) @@ -660,7 +660,7 @@ func (jd *ReadonlyHandleT) GetJobIDStatus(jobID, _ string) (string, error) { dsListTotal := jd.getDSList() for _, dsPair := range dsListTotal { var min, max sql.NullInt32 - sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %s`, dsPair.JobTable) + sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %q`, dsPair.JobTable) row := jd.DbHandle.QueryRow(sqlStatement) err := row.Scan(&min, &max) if err != nil { @@ -676,7 +676,7 @@ func (jd *ReadonlyHandleT) GetJobIDStatus(jobID, _ string) (string, error) { if jobId < int(min.Int32) || jobId > int(max.Int32) { continue } - sqlStatement = fmt.Sprintf(`SELECT job_id, job_state, attempt, exec_time, retry_time,error_code, error_response FROM %[1]s WHERE job_id = %[2]s;`, dsPair.JobStatusTable, jobID) + sqlStatement = fmt.Sprintf(`SELECT job_id, job_state, attempt, exec_time, retry_time,error_code, error_response FROM %[1]q WHERE job_id = %[2]s;`, dsPair.JobStatusTable, jobID) var statusCode sql.NullString var eventList []JobStatusT rows, err := jd.DbHandle.Query(sqlStatement) @@ -696,13 +696,11 @@ func (jd *ReadonlyHandleT) GetJobIDStatus(jobID, _ string) (string, error) { eventList = append(eventList, event) } - { - response, err := json.MarshalIndent(FailedStatusStats{FailedStatusStats: eventList}, "", " ") - if err != nil { - return "", err - } - return string(response), nil + response, err := json.MarshalIndent(FailedStatusStats{FailedStatusStats: eventList}, "", " ") + if err != nil { + return "", err } + return string(response), nil } // jobID not found @@ -726,7 +724,7 @@ func (jd *ReadonlyHandleT) GetJobIDsForUser(args []string) (string, error) { return "", nil } var min, max sql.NullInt32 - sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %s`, dsPair.JobTable) + sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %q`, dsPair.JobTable) row := jd.DbHandle.QueryRow(sqlStatement) err = row.Scan(&min, &max) if err != nil { @@ -738,7 +736,7 @@ func (jd *ReadonlyHandleT) GetJobIDsForUser(args []string) (string, error) { if jobId2 < int(min.Int32) || jobId1 > int(max.Int32) { continue } - sqlStatement = fmt.Sprintf(`SELECT job_id FROM %[1]s WHERE job_id >= %[2]s AND job_id <= %[3]s AND user_id = '%[4]s';`, dsPair.JobTable, args[2], args[3], userID) + sqlStatement = fmt.Sprintf(`SELECT job_id FROM %[1]q WHERE job_id >= %[2]s AND job_id <= %[3]s AND user_id = '%[4]s';`, dsPair.JobTable, args[2], args[3], userID) rows, err := jd.DbHandle.Query(sqlStatement) if err != nil { return "", err @@ -763,7 +761,7 @@ func (jd *ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination(args []st dsList := DSPair{JobTableName: jobPrefix + args[2], JobStatusTableName: statusPrefix + args[2]} sqlStatement := fmt.Sprintf(`select count(*), a.error_code, a.custom_val, a.d from (select count(*), rt.job_id, st.error_code as error_code, rt.custom_val as custom_val, - rt.parameters -> 'destination_id' as d from %[1]s rt inner join %[2]s st + rt.parameters -> 'destination_id' as d from %[1]q rt inner join %[2]q st on st.job_id=rt.job_id where st.job_state in ('failed', 'aborted') group by rt.job_id, st.error_code, rt.custom_val, rt.parameters -> 'destination_id') as a group by a.custom_val, a.error_code, a.d order by a.custom_val;`, dsList.JobTableName, dsList.JobStatusTableName)