Skip to content

Commit

Permalink
chore: quote table identifier (#2810)
Browse files Browse the repository at this point in the history
  • Loading branch information
chandumlg committed Dec 15, 2022
1 parent 60fc5f7 commit 58f2e4c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 45 deletions.
1 change: 1 addition & 0 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3449,6 +3449,7 @@ func (jd *HandleT) deleteJobStatusDSInTx(txHandler transactionHandler, ds dataSe
if len(customValFilters) > 0 {
sqlFilters = append(sqlFilters, constructQueryOR(fmt.Sprintf(`%q.custom_val`, ds.JobTable), customValFilters))
}

if len(parameterFilters) > 0 {
sqlFilters = append(sqlFilters, constructParameterJSONQuery(ds.JobTable, parameterFilters))
}
Expand Down
88 changes: 43 additions & 45 deletions jobsdb/readonly_jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (jd *ReadonlyHandleT) getUnprocessedJobsDSCount(ctx context.Context, ds dat

var sqlStatement string

sqlStatement = fmt.Sprintf(`LOCK TABLE %s IN ACCESS SHARE MODE;`, ds.JobStatusTable)
sqlStatement = fmt.Sprintf(`LOCK TABLE %q IN ACCESS SHARE MODE;`, ds.JobStatusTable)
err = jd.prepareAndExecStmtInTxn(txn, sqlStatement)
if err != nil {
if rollbackErr := txn.Rollback(); rollbackErr != nil {
Expand All @@ -229,7 +229,7 @@ func (jd *ReadonlyHandleT) getUnprocessedJobsDSCount(ctx context.Context, ds dat
return 0, err
}

sqlStatement = fmt.Sprintf(`LOCK TABLE %s IN ACCESS SHARE MODE;`, ds.JobTable)
sqlStatement = fmt.Sprintf(`LOCK TABLE %q IN ACCESS SHARE MODE;`, ds.JobTable)
err = jd.prepareAndExecStmtInTxn(txn, sqlStatement)
if err != nil {
if rollbackErr := txn.Rollback(); rollbackErr != nil {
Expand All @@ -245,15 +245,15 @@ func (jd *ReadonlyHandleT) getUnprocessedJobsDSCount(ctx context.Context, ds dat
} else {
selectColumn = "COUNT(*)"
}
sqlStatement = fmt.Sprintf(`SELECT %[3]s FROM %[1]s LEFT JOIN %[2]s ON %[1]s.job_id=%[2]s.job_id
WHERE %[2]s.job_id is NULL`, ds.JobTable, ds.JobStatusTable, selectColumn)
sqlStatement = fmt.Sprintf(`SELECT %[3]s FROM %[1]q AS jobs LEFT JOIN %[2]q AS job_status ON jobs.job_id=job_status.job_id
WHERE job_status.job_id is NULL`, ds.JobTable, ds.JobStatusTable, selectColumn)

if len(customValFilters) > 0 {
sqlStatement += " AND " + constructQueryOR(fmt.Sprintf("%s.custom_val", ds.JobTable), customValFilters)
sqlStatement += " AND " + constructQueryOR("jobs.custom_val", customValFilters)
}

if len(parameterFilters) > 0 {
sqlStatement += " AND " + constructParameterJSONQuery(ds.JobTable, parameterFilters)
sqlStatement += " AND " + constructParameterJSONQuery("jobs", parameterFilters)
}

if jd.tablePrefix == "gw" {
Expand Down Expand Up @@ -352,13 +352,13 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS
}
if len(customValFilters) > 0 {
customValQuery = " AND " +
constructQueryOR(fmt.Sprintf("%s.custom_val", ds.JobTable), customValFilters)
constructQueryOR("jobs.custom_val", customValFilters)
} else {
customValQuery = ""
}

if len(parameterFilters) > 0 {
sourceQuery += " AND " + constructParameterJSONQuery(ds.JobTable, parameterFilters)
sourceQuery += " AND " + constructParameterJSONQuery("jobs", parameterFilters)
} else {
sourceQuery = ""
}
Expand All @@ -371,7 +371,7 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS

var sqlStatement string

sqlStatement = fmt.Sprintf(`LOCK TABLE %s IN ACCESS SHARE MODE;`, ds.JobStatusTable)
sqlStatement = fmt.Sprintf(`LOCK TABLE %q IN ACCESS SHARE MODE;`, ds.JobStatusTable)
err = jd.prepareAndExecStmtInTxn(txn, sqlStatement)
if err != nil {
if rollbackErr := txn.Rollback(); rollbackErr != nil {
Expand All @@ -381,7 +381,7 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS
return 0, err
}

sqlStatement = fmt.Sprintf(`LOCK TABLE %s IN ACCESS SHARE MODE;`, ds.JobTable)
sqlStatement = fmt.Sprintf(`LOCK TABLE %q IN ACCESS SHARE MODE;`, ds.JobTable)
err = jd.prepareAndExecStmtInTxn(txn, sqlStatement)
if err != nil {
if rollbackErr := txn.Rollback(); rollbackErr != nil {
Expand All @@ -393,13 +393,13 @@ func (jd *ReadonlyHandleT) getProcessedJobsDSCount(ctx context.Context, ds dataS

var selectColumn string
if jd.tablePrefix == "gw" {
selectColumn = fmt.Sprintf("%[1]s.event_payload->'batch' as batch", ds.JobTable)
selectColumn = "jobs.event_payload->'batch' as batch"
} else {
selectColumn = fmt.Sprintf("COUNT(%[1]s.job_id)", ds.JobTable)
selectColumn = "COUNT(jobs.job_id)"
}
sqlStatement = fmt.Sprintf(`SELECT %[6]s FROM
%[1]s
JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
%[1]q AS jobs
JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id
%[3]s
%[4]s %[5]s
AND job_latest_state.retry_time < $1`,
Expand Down Expand Up @@ -504,12 +504,12 @@ func (jd *ReadonlyHandleT) GetJobSummaryCount(arg, prefix string) (string, error
var dsString string
for _, dsPair := range dsListArr {
sqlStatement := fmt.Sprintf(`SELECT COUNT(*),
%[1]s.parameters->'source_id' as source,
%[1]s.custom_val ,%[1]s.parameters->'destination_id' as destination,
jobs.parameters->'source_id' as source,
jobs.custom_val, jobs.parameters->'destination_id' as destination,
job_latest_state.job_state
FROM %[1]s
LEFT JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
GROUP BY job_latest_state.job_state, %[1]s.parameters->'source_id', %[1]s.parameters->'destination_id', %[1]s.custom_val;`, dsPair.JobTableName, dsPair.JobStatusTableName)
FROM %[1]q AS jobs
LEFT JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id
GROUP BY job_latest_state.job_state, jobs.parameters->'source_id', jobs.parameters->'destination_id', jobs.custom_val;`, dsPair.JobTableName, dsPair.JobStatusTableName)
row, err := jd.DbHandle.Query(sqlStatement)
if err != nil {
return "", err
Expand Down Expand Up @@ -559,17 +559,17 @@ func (jd *ReadonlyHandleT) GetLatestFailedJobs(arg, prefix string) (string, erro
dsListTotal := jd.getDSList()
dsList = DSPair{JobTableName: dsListTotal[0].JobTable, JobStatusTableName: dsListTotal[0].JobStatusTable}
}
sqlStatement := fmt.Sprintf(`SELECT %[1]s.job_id, %[1]s.user_id, %[1]s.custom_val,
sqlStatement := fmt.Sprintf(`SELECT jobs.job_id, jobs.user_id, jobs.custom_val,
job_latest_state.exec_time,
job_latest_state.error_code, job_latest_state.error_response
FROM %[1]s
JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
FROM %[1]q AS jobs
JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id
WHERE job_latest_state.job_state = 'failed'
`, dsList.JobTableName, dsList.JobStatusTableName)
if argList[1] != "" {
sqlStatement = sqlStatement + fmt.Sprintf(`AND %[1]s.custom_val = '%[2]s'`, dsList.JobTableName, argList[1])
sqlStatement = sqlStatement + fmt.Sprintf(`AND jobs.custom_val = '%[1]s'`, argList[1])
}
sqlStatement = sqlStatement + fmt.Sprintf(`ORDER BY %[1]s.job_id desc LIMIT 5;`, dsList.JobTableName)
sqlStatement = sqlStatement + `ORDER BY jobs.job_id desc LIMIT 5;`
row, err := jd.DbHandle.Query(sqlStatement)
if err != nil {
return "", err
Expand Down Expand Up @@ -601,7 +601,7 @@ func (jd *ReadonlyHandleT) GetJobByID(job_id, _ string) (string, error) {
var response []byte
for _, dsPair := range dsListTotal {
var min, max sql.NullInt32
sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %s`, dsPair.JobTable)
sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %q`, dsPair.JobTable)
row := jd.DbHandle.QueryRow(sqlStatement)
err := row.Scan(&min, &max)
if err != nil {
Expand All @@ -618,15 +618,15 @@ func (jd *ReadonlyHandleT) GetJobByID(job_id, _ string) (string, error) {
continue
}
sqlStatement = fmt.Sprintf(`SELECT
%[1]s.job_id, %[1]s.uuid, %[1]s.user_id, %[1]s.parameters, %[1]s.custom_val, %[1]s.event_payload,
%[1]s.created_at, %[1]s.expire_at,
jobs.job_id, jobs.uuid, jobs.user_id, jobs.parameters, jobs.custom_val, jobs.event_payload,
jobs.created_at, jobs.expire_at,
job_latest_state.job_state, job_latest_state.attempt,
job_latest_state.exec_time, job_latest_state.retry_time,
job_latest_state.error_code, job_latest_state.error_response
FROM
%[1]s
LEFT JOIN "v_last_%[2]s" job_latest_state ON %[1]s.job_id=job_latest_state.job_id
WHERE %[1]s.job_id = %[3]s;`, dsPair.JobTable, dsPair.JobStatusTable, job_id)
%[1]q AS jobs
LEFT JOIN "v_last_%[2]s" job_latest_state ON jobs.job_id=job_latest_state.job_id
WHERE jobs.job_id = %[3]s;`, dsPair.JobTable, dsPair.JobStatusTable, job_id)

event := JobT{}
row = jd.DbHandle.QueryRow(sqlStatement)
Expand All @@ -636,11 +636,11 @@ func (jd *ReadonlyHandleT) GetJobByID(job_id, _ string) (string, error) {
&event.LastJobStatus.ErrorResponse)
if err != nil {
sqlStatement = fmt.Sprintf(`SELECT
%[1]s.job_id, %[1]s.uuid, %[1]s.user_id, %[1]s.parameters, %[1]s.custom_val, %[1]s.event_payload,
%[1]s.created_at, %[1]s.expire_at
jobs.job_id, jobs.uuid, jobs.user_id, jobs.parameters, jobs.custom_val, jobs.event_payload,
jobs.created_at, jobs.expire_at
FROM
%[1]s
WHERE %[1]s.job_id = %[2]s;`, dsPair.JobTable, job_id)
%[1]q AS jobs
WHERE jobs.job_id = %[2]s;`, dsPair.JobTable, job_id)
row = jd.DbHandle.QueryRow(sqlStatement)
err1 := row.Scan(&event.JobID, &event.UUID, &event.UserID, &event.Parameters, &event.CustomVal, &event.EventPayload,
&event.CreatedAt, &event.ExpireAt)
Expand All @@ -660,7 +660,7 @@ func (jd *ReadonlyHandleT) GetJobIDStatus(jobID, _ string) (string, error) {
dsListTotal := jd.getDSList()
for _, dsPair := range dsListTotal {
var min, max sql.NullInt32
sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %s`, dsPair.JobTable)
sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %q`, dsPair.JobTable)
row := jd.DbHandle.QueryRow(sqlStatement)
err := row.Scan(&min, &max)
if err != nil {
Expand All @@ -676,7 +676,7 @@ func (jd *ReadonlyHandleT) GetJobIDStatus(jobID, _ string) (string, error) {
if jobId < int(min.Int32) || jobId > int(max.Int32) {
continue
}
sqlStatement = fmt.Sprintf(`SELECT job_id, job_state, attempt, exec_time, retry_time,error_code, error_response FROM %[1]s WHERE job_id = %[2]s;`, dsPair.JobStatusTable, jobID)
sqlStatement = fmt.Sprintf(`SELECT job_id, job_state, attempt, exec_time, retry_time,error_code, error_response FROM %[1]q WHERE job_id = %[2]s;`, dsPair.JobStatusTable, jobID)
var statusCode sql.NullString
var eventList []JobStatusT
rows, err := jd.DbHandle.Query(sqlStatement)
Expand All @@ -696,13 +696,11 @@ func (jd *ReadonlyHandleT) GetJobIDStatus(jobID, _ string) (string, error) {
eventList = append(eventList, event)
}

{
response, err := json.MarshalIndent(FailedStatusStats{FailedStatusStats: eventList}, "", " ")
if err != nil {
return "", err
}
return string(response), nil
response, err := json.MarshalIndent(FailedStatusStats{FailedStatusStats: eventList}, "", " ")
if err != nil {
return "", err
}
return string(response), nil
}

// jobID not found
Expand All @@ -726,7 +724,7 @@ func (jd *ReadonlyHandleT) GetJobIDsForUser(args []string) (string, error) {
return "", nil
}
var min, max sql.NullInt32
sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %s`, dsPair.JobTable)
sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %q`, dsPair.JobTable)
row := jd.DbHandle.QueryRow(sqlStatement)
err = row.Scan(&min, &max)
if err != nil {
Expand All @@ -738,7 +736,7 @@ func (jd *ReadonlyHandleT) GetJobIDsForUser(args []string) (string, error) {
if jobId2 < int(min.Int32) || jobId1 > int(max.Int32) {
continue
}
sqlStatement = fmt.Sprintf(`SELECT job_id FROM %[1]s WHERE job_id >= %[2]s AND job_id <= %[3]s AND user_id = '%[4]s';`, dsPair.JobTable, args[2], args[3], userID)
sqlStatement = fmt.Sprintf(`SELECT job_id FROM %[1]q WHERE job_id >= %[2]s AND job_id <= %[3]s AND user_id = '%[4]s';`, dsPair.JobTable, args[2], args[3], userID)
rows, err := jd.DbHandle.Query(sqlStatement)
if err != nil {
return "", err
Expand All @@ -763,7 +761,7 @@ func (jd *ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination(args []st
dsList := DSPair{JobTableName: jobPrefix + args[2], JobStatusTableName: statusPrefix + args[2]}
sqlStatement := fmt.Sprintf(`select count(*), a.error_code, a.custom_val, a.d from
(select count(*), rt.job_id, st.error_code as error_code, rt.custom_val as custom_val,
rt.parameters -> 'destination_id' as d from %[1]s rt inner join %[2]s st
rt.parameters -> 'destination_id' as d from %[1]q rt inner join %[2]q st
on st.job_id=rt.job_id where st.job_state in ('failed', 'aborted')
group by rt.job_id, st.error_code, rt.custom_val, rt.parameters -> 'destination_id')
as a group by a.custom_val, a.error_code, a.d order by a.custom_val;`, dsList.JobTableName, dsList.JobStatusTableName)
Expand Down

0 comments on commit 58f2e4c

Please sign in to comment.