diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index c526eb54b7..167dedb857 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -60,5 +60,5 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: - version: v1.51.2 + version: v1.52 args: -v diff --git a/Makefile b/Makefile index 5957199eab..c7ce57c90f 100644 --- a/Makefile +++ b/Makefile @@ -83,7 +83,7 @@ install-tools: .PHONY: lint lint: fmt ## Run linters on all go files - docker run --rm -v $(shell pwd):/app:ro -w /app golangci/golangci-lint:v1.51.2 bash -e -c \ + docker run --rm -v $(shell pwd):/app:ro -w /app golangci/golangci-lint:v1.52 bash -e -c \ 'golangci-lint run -v --timeout 5m' .PHONY: fmt diff --git a/build/wait-for-go/wait-for.go b/build/wait-for-go/wait-for.go index 11bccfe743..c8d0fc8fe5 100644 --- a/build/wait-for-go/wait-for.go +++ b/build/wait-for-go/wait-for.go @@ -28,7 +28,6 @@ func canConnect(host, port, protocol string) bool { fmt.Println("UDP error:", err) if err != nil { _ = conn.Close() - conn = nil return false } } diff --git a/jobsdb/admin.go b/jobsdb/admin.go index 5fe585ca8f..1b03de6af0 100644 --- a/jobsdb/admin.go +++ b/jobsdb/admin.go @@ -12,11 +12,10 @@ Ping returns health check for pg database func (jd *HandleT) Ping() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - rows, err := jd.dbHandle.QueryContext(ctx, `SELECT 'Rudder DB Health Check'::text as message`) + _, err := jd.dbHandle.ExecContext(ctx, `SELECT 'Rudder DB Health Check'::text as message`) if err != nil { return err } - _ = rows.Close() return nil } diff --git a/jobsdb/backup.go b/jobsdb/backup.go index c58c36858b..e9373aa354 100644 --- a/jobsdb/backup.go +++ b/jobsdb/backup.go @@ -544,6 +544,9 @@ func (jd *HandleT) createTableDumps(queryFunc func(int64) string, pathFunc func( } offset++ } + if err := rows.Err(); err != nil { + return fmt.Errorf("error while iterating rows: %w", err) + } return nil } diff --git a/jobsdb/integration_test.go b/jobsdb/integration_test.go index 40a3aff889..74252aed11 100644 --- a/jobsdb/integration_test.go +++ b/jobsdb/integration_test.go @@ -1090,6 +1090,9 @@ func TestCreateDS(t *testing.T) { require.NoError(t, err) tableNames = append(tableNames, tableName) } + if err = tables.Err(); err != nil { + require.NoError(t, err) + } require.Equal(t, len(tableNames), 2, `should find two tables`) require.Equal(t, tableNames[0], prefix+"_jobs_-2") require.Equal(t, tableNames[1], prefix+"_jobs_-1") @@ -1401,6 +1404,7 @@ func getPayloadSize(t *testing.T, jobsDB JobsDB, job *JobT) (int64, error) { require.NoError(t, err) tables = append(tables, table) } + require.NoError(t, rows.Err()) _ = rows.Close() for _, table := range tables { diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 4e189da686..0fa48fd6bc 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -1066,14 +1066,11 @@ func (jd *HandleT) refreshDSRangeList(l lock.LockToken) { jd.assert(ds.Index != "", "ds.Index is empty") sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %q`, ds.JobTable) // Note: Using Query instead of QueryRow, because the sqlmock library doesn't have support for QueryRow - rows, err := jd.dbHandle.Query(sqlStatement) + row := jd.dbHandle.QueryRow(sqlStatement) + + err := row.Scan(&minID, &maxID) jd.assertError(err) - for rows.Next() { - err := rows.Scan(&minID, &maxID) - jd.assertError(err) - break - } - _ = rows.Close() + jd.logger.Debug(sqlStatement, minID, maxID) // We store ranges EXCEPT for // 1. the last element (which is being actively written to) @@ -1261,7 +1258,7 @@ func (jd *HandleT) computeNewIdxForAppend(l lock.LockToken) string { } func (jd *HandleT) doComputeNewIdxForAppend(dList []dataSetT) string { - newDSIdx := "" + var newDSIdx string if len(dList) == 0 { newDSIdx = "1" } else { @@ -1845,6 +1842,9 @@ func (jd *HandleT) GetActiveWorkspaces(ctx context.Context, customVal string) ([ } workspaceIds = append(workspaceIds, workspaceId) } + if err = rows.Err(); err != nil { + return nil, err + } return workspaceIds, nil } @@ -1886,6 +1886,9 @@ func (jd *HandleT) GetDistinctParameterValues(ctx context.Context, parameterName } values = append(values, value) } + if err = rows.Err(); err != nil { + return nil, err + } return values, nil } @@ -2105,6 +2108,9 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, params G payloadSize = runningPayloadSize eventCount = runningEventCount } + if err := rows.Err(); err != nil { + return JobsResult{}, false, err + } if !limitsReached && (params.JobsLimit > 0 && len(jobList) == params.JobsLimit) || // we reached the jobs limit (params.EventsLimit > 0 && eventCount >= params.EventsLimit) || // we reached the events limit @@ -2213,9 +2219,6 @@ func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, params return JobsResult{}, false, err } defer func() { _ = rows.Close() }() - if err != nil { - return JobsResult{}, false, err - } var runningEventCount int var runningPayloadSize int64 @@ -2248,6 +2251,9 @@ func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, params eventCount = runningEventCount } + if err := rows.Err(); err != nil { + return JobsResult{}, false, err + } if !limitsReached && (params.JobsLimit > 0 && len(jobList) == params.JobsLimit) || // we reached the jobs limit (params.EventsLimit > 0 && eventCount >= params.EventsLimit) || // we reached the events limit @@ -2575,6 +2581,9 @@ func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT) { jd.assertError(err) count++ } + if err = rows.Err(); err != nil { + jd.assertError(err) + } return } @@ -2621,6 +2630,9 @@ func (jd *HandleT) recoverFromCrash(owner OwnerType, goRoutineType string) { jd.assert(!opDone, "opDone is true") count++ } + if err = rows.Err(); err != nil { + jd.assertError(err) + } jd.assert(count <= 1, fmt.Sprintf("count:%d > 1", count)) if count == 0 { diff --git a/jobsdb/jobsdb_utils.go b/jobsdb/jobsdb_utils.go index c31a5cfb24..70fd574889 100644 --- a/jobsdb/jobsdb_utils.go +++ b/jobsdb/jobsdb_utils.go @@ -103,6 +103,9 @@ func getAllTableNames(dbHandle sqlDbOrTx) ([]string, error) { } tableNames = append(tableNames, tbName) } + if err = rows.Err(); err != nil { + return tableNames, err + } return tableNames, nil } diff --git a/jobsdb/migration.go b/jobsdb/migration.go index fce5a4d805..7d324e6dd2 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -186,6 +186,9 @@ func (jd *HandleT) getCleanUpCandidates(ctx context.Context, dsList []dataSetT) } estimates[tableName] = estimate } + if err = rows.Err(); err != nil { + return nil, err + } datasets := lo.Filter(dsList, func(ds dataSetT, idx int) bool { diff --git a/router/eventorder_test.go b/router/eventorder_test.go index 70c172f4b2..fc7e071494 100644 --- a/router/eventorder_test.go +++ b/router/eventorder_test.go @@ -440,6 +440,9 @@ func (eventOrderMethods) countDrainedJobs(db *sql.DB) int { tables = append(tables, table) } } + if err = rows.Err(); err != nil { + panic(err) + } } for _, table := range tables { var dsCount int diff --git a/services/multitenant/tenantstats.go b/services/multitenant/tenantstats.go index 7cd1f7b997..93474bb495 100644 --- a/services/multitenant/tenantstats.go +++ b/services/multitenant/tenantstats.go @@ -273,7 +273,7 @@ func (t *Stats) GetRouterPickupJobs(destType string, noOfWorkers int, routerTime break } - pickUpCount := 0 + var pickUpCount int if t.routerTenantLatencyStat[destType][workspaceKey].Value() == 0 { pickUpCount = misc.MinInt(pendingEvents-workspacePickUpCount[workspaceKey], runningJobCount) } else { diff --git a/services/pgnotifier/pgnotifier.go b/services/pgnotifier/pgnotifier.go index 69b6df7756..d7fbabb250 100644 --- a/services/pgnotifier/pgnotifier.go +++ b/services/pgnotifier/pgnotifier.go @@ -717,6 +717,10 @@ func (notifier *PGNotifier) RunMaintenanceWorker(ctx context.Context) error { } ids = append(ids, id) } + if err := rows.Err(); err != nil { + panic(err) + } + _ = rows.Close() pkgLogger.Debugf("PgNotifier: Re-triggered job ids: %v", ids) diff --git a/services/validators/envValidator.go b/services/validators/envValidator.go index 53f3c5c60a..2a43abfeef 100644 --- a/services/validators/envValidator.go +++ b/services/validators/envValidator.go @@ -153,7 +153,7 @@ func killDanglingDBConnections(db *sql.DB) error { AND APPLICATION_NAME = CURRENT_SETTING('APPLICATION_NAME') AND APPLICATION_NAME <> ''`) if err != nil { - return fmt.Errorf("error occurred when querying pg_stat_activity table for terminating dangling connections: %w", err) + return fmt.Errorf("querying pg_stat_activity table for terminating dangling connections: %w", err) } defer func() { _ = rows.Close() }() @@ -172,10 +172,13 @@ func killDanglingDBConnections(db *sql.DB) error { var row danglingConnRow err := rows.Scan(&row.pid, &row.queryStart, &row.waitEventType, &row.waitEvent, &row.state, &row.query, &row.terminated) if err != nil { - return fmt.Errorf("error occurred when scanning pg_stat_activity table for terminating dangling connections: %w", err) + return fmt.Errorf("scanning pg_stat_activity table for terminating dangling connections: %w", err) } dangling = append(dangling, &row) } + if err := rows.Err(); err != nil { + return fmt.Errorf("iterating pg_stat_activity table for terminating dangling connections: %w", err) + } if len(dangling) > 0 { pkgLogger.Warnf("Terminated %d dangling connection(s)", len(dangling)) diff --git a/utils/misc/dbutils.go b/utils/misc/dbutils.go index 2a1bdfad1d..3e1cd9e12d 100644 --- a/utils/misc/dbutils.go +++ b/utils/misc/dbutils.go @@ -46,11 +46,10 @@ func ReplaceDB(dbName, targetName string) { // Killing sessions on the db sqlStatement := fmt.Sprintf("SELECT pid, pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '%s' AND pid <> pg_backend_pid();", dbName) - rows, err := db.Query(sqlStatement) + _, err = db.Exec(sqlStatement) if err != nil { panic(err) } - rows.Close() renameDBStatement := fmt.Sprintf(`ALTER DATABASE %q RENAME TO %q`, dbName, targetName) diff --git a/utils/misc/misc.go b/utils/misc/misc.go index 830bd9064c..c996e864b4 100644 --- a/utils/misc/misc.go +++ b/utils/misc/misc.go @@ -945,6 +945,9 @@ func MakeRetryablePostRequest(url, endpoint string, data interface{}) (response } body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, -1, err + } defer func() { httputil.CloseResponse(resp) }() pkgLogger.Debugf("Post request: Successful %s", string(body)) diff --git a/warehouse/api.go b/warehouse/api.go index 24ed09e071..fbdd56f9ef 100644 --- a/warehouse/api.go +++ b/warehouse/api.go @@ -464,6 +464,11 @@ func (tableUploadReq TableUploadReq) GetWhTableUploads(ctx context.Context) ([]* } tableUploads = append(tableUploads, &tableUpload) } + if err = rows.Err(); err != nil { + tableUploadReq.API.log.Errorf(err.Error()) + return []*proto.WHTable{}, err + } + return tableUploads, nil } @@ -649,6 +654,10 @@ func (uploadsReq *UploadsReq) getUploadsFromDB(ctx context.Context, isMultiWorks upload.Tables = make([]*proto.WHTable, 0) uploads = append(uploads, &upload) } + if err = rows.Err(); err != nil { + return nil, 0, err + } + return uploads, totalUploadCount, err } diff --git a/warehouse/archive/archiver.go b/warehouse/archive/archiver.go index 94439f5418..2451ed8992 100644 --- a/warehouse/archive/archiver.go +++ b/warehouse/archive/archiver.go @@ -291,6 +291,10 @@ func (a *Archiver) Do(ctx context.Context) error { stagingFileIDs = append(stagingFileIDs, stagingFileID) stagingFileLocations = append(stagingFileLocations, stagingFileLocation) } + if err := stagingFileRows.Err(); err != nil { + txn.Rollback() + return fmt.Errorf("iterating staging file rows: %w", err) + } stagingFileRows.Close() var storedStagingFilesLocation string @@ -333,7 +337,7 @@ func (a *Archiver) Do(ctx context.Context) error { warehouseutils.WarehouseStagingFilesTable, misc.IntArrayToString(stagingFileIDs, ","), ) - _, err = txn.QueryContext(ctx, stmt) + _, err = txn.ExecContext(ctx, stmt) if err != nil { a.Logger.Errorf(`Error running txn in archiveUploadFiles. Query: %s Error: %v`, stmt, err) txn.Rollback() @@ -369,6 +373,10 @@ func (a *Archiver) Do(ctx context.Context) error { } loadLocations = append(loadLocations, loc) } + if err := loadLocationRows.Err(); err != nil { + txn.Rollback() + return fmt.Errorf("iterating load file location: %w", err) + } loadLocationRows.Close() var paths []string for _, loc := range loadLocations { diff --git a/warehouse/client/client.go b/warehouse/client/client.go index 2fb9c45afd..3c958eaea1 100644 --- a/warehouse/client/client.go +++ b/warehouse/client/client.go @@ -67,6 +67,10 @@ func (cl *Client) sqlQuery(statement string) (result warehouseutils.QueryResult, } result.Values = append(result.Values, stringRow) } + if err = rows.Err(); err != nil { + return result, err + } + return result, err } diff --git a/warehouse/identity/identity.go b/warehouse/identity/identity.go index 678386e4d1..89913effd4 100644 --- a/warehouse/identity/identity.go +++ b/warehouse/identity/identity.go @@ -149,6 +149,7 @@ func (idr *Identity) applyRule(txn *sql.Tx, ruleID int64, gzWriter *misc.GZipWri if err != nil { return } + defer tableRows.Close() for tableRows.Next() { var mergePropType, mergePropVal string @@ -159,6 +160,9 @@ func (idr *Identity) applyRule(txn *sql.Tx, ruleID int64, gzWriter *misc.GZipWri row := []string{mergePropType, mergePropVal, newID, currentTimeString} rows = append(rows, row) } + if err = tableRows.Err(); err != nil { + return + } sqlStatement = fmt.Sprintf(`UPDATE %s SET rudder_id='%s', updated_at='%s' WHERE rudder_id IN (%v)`, idr.mappingsTable(), newID, currentTimeString, misc.SingleQuoteLiteralJoin(rudderIDs[1:])) var res sql.Result @@ -317,6 +321,10 @@ func (idr *Identity) addRules(txn *sql.Tx, loadFileNames []string, gzWriter *mis } ids = append(ids, id) } + if err = rows.Err(); err != nil { + pkgLogger.Errorf(`IDR: Error reading rows from %s from %s: %v`, idr.mergeRulesTable(), mergeRulesStagingTable, err) + return + } pkgLogger.Debugf(`IDR: Number of merge rules inserted for uploadID %v : %v`, idr.uploadID, len(ids)) return ids, nil } @@ -361,6 +369,9 @@ func (idr *Identity) writeTableToFile(tableName string, txn *sql.Tx, gzWriter *m rowString, _ := eventLoader.WriteToString() gzWriter.WriteGZ(rowString) } + if err = rows.Err(); err != nil { + return + } offset += batchSize if offset >= totalRows { diff --git a/warehouse/integrations/azure-synapse/azure-synapse.go b/warehouse/integrations/azure-synapse/azure-synapse.go index bddb4a22df..54b1f58428 100644 --- a/warehouse/integrations/azure-synapse/azure-synapse.go +++ b/warehouse/integrations/azure-synapse/azure-synapse.go @@ -697,6 +697,9 @@ func (as *AzureSynapse) dropDanglingStagingTables(ctx context.Context) bool { } stagingTableNames = append(stagingTableNames, tableName) } + if err := rows.Err(); err != nil { + panic(fmt.Errorf("iterating result from query: %s\nwith Error : %w", sqlStatement, err)) + } as.Logger.Infof("WH: SYNAPSE: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames) delSuccess := true for _, stagingTableName := range stagingTableNames { diff --git a/warehouse/integrations/clickhouse/clickhouse_test.go b/warehouse/integrations/clickhouse/clickhouse_test.go index 232ce5a2d9..51e1e40c6d 100644 --- a/warehouse/integrations/clickhouse/clickhouse_test.go +++ b/warehouse/integrations/clickhouse/clickhouse_test.go @@ -621,6 +621,7 @@ func TestClickhouse_LoadTableRoundTrip(t *testing.T) { require.Fail(t, fmt.Sprintf("table %s column %s is of Nullable type even when disableNullable is set to true", tableName, columnName)) } } + require.NoError(t, rows.Err()) } t.Log("Loading data into table") diff --git a/warehouse/integrations/deltalake-native/deltalake.go b/warehouse/integrations/deltalake-native/deltalake.go index 39767d6373..56b7a60bdb 100644 --- a/warehouse/integrations/deltalake-native/deltalake.go +++ b/warehouse/integrations/deltalake-native/deltalake.go @@ -905,6 +905,8 @@ func (d *Deltalake) partitionQuery(ctx context.Context, tableName string) (strin } defer func() { _ = rows.Close() }() + _ = rows.Err() // ignore error + partitionColumns, err := rows.Columns() if err != nil { return "", fmt.Errorf("scanning partition columns: %w", err) diff --git a/warehouse/integrations/middleware/sqlquerywrapper/sql_test.go b/warehouse/integrations/middleware/sqlquerywrapper/sql_test.go index 92262b923d..489f445667 100644 --- a/warehouse/integrations/middleware/sqlquerywrapper/sql_test.go +++ b/warehouse/integrations/middleware/sqlquerywrapper/sql_test.go @@ -94,16 +94,16 @@ func TestQueryWrapper(t *testing.T) { _, err = qw.ExecContext(ctx, query) require.NoError(t, err) - _, err = qw.Query(query) + _, err = qw.Query(query) //nolint:rowserrcheck require.NoError(t, err) - _, err = qw.QueryContext(ctx, query) + _, err = qw.QueryContext(ctx, query) //nolint:rowserrcheck require.NoError(t, err) - _ = qw.QueryRow(query) + _ = qw.QueryRow(query) //nolint:rowserrcheck require.NoError(t, err) - _ = qw.QueryRowContext(ctx, query) + _ = qw.QueryRowContext(ctx, query) //nolint:rowserrcheck require.NoError(t, err) }) @@ -280,10 +280,10 @@ func TestQueryWrapper(t *testing.T) { tx, err := qw.Begin() require.NoError(t, err) - _, err = tx.Query(query) + _, err = tx.Query(query) // nolint:rowserrcheck require.NoError(t, err) - err = tx.Commit() + err = tx.Commit() // nolint:rowserrcheck require.NoError(t, err) }) @@ -291,7 +291,7 @@ func TestQueryWrapper(t *testing.T) { tx, err := qw.Begin() require.NoError(t, err) - _, err = tx.QueryContext(ctx, query) + _, err = tx.QueryContext(ctx, query) // nolint:rowserrcheck require.NoError(t, err) err = tx.Commit() diff --git a/warehouse/integrations/mssql/mssql.go b/warehouse/integrations/mssql/mssql.go index e4d17afc08..eafd54c4ca 100644 --- a/warehouse/integrations/mssql/mssql.go +++ b/warehouse/integrations/mssql/mssql.go @@ -737,6 +737,9 @@ func (ms *MSSQL) dropDanglingStagingTables(ctx context.Context) bool { } stagingTableNames = append(stagingTableNames, tableName) } + if err := rows.Err(); err != nil { + panic(fmt.Errorf("iterating result from query: %s\nwith Error : %w", sqlStatement, err)) + } ms.Logger.Infof("WH: MSSQL: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames) delSuccess := true for _, stagingTableName := range stagingTableNames { diff --git a/warehouse/integrations/postgres-legacy/postgres.go b/warehouse/integrations/postgres-legacy/postgres.go index 25d20cebb0..0028975310 100644 --- a/warehouse/integrations/postgres-legacy/postgres.go +++ b/warehouse/integrations/postgres-legacy/postgres.go @@ -865,10 +865,13 @@ func (pg *Postgres) dropDanglingStagingTables(ctx context.Context) bool { var tableName string err := rows.Scan(&tableName) if err != nil { - panic(fmt.Errorf("Failed to scan result from query: %s\nwith Error : %w", sqlStatement, err)) + panic(fmt.Errorf("scan result from query: %s\nwith Error : %w", sqlStatement, err)) } stagingTableNames = append(stagingTableNames, tableName) } + if err := rows.Err(); err != nil { + panic(fmt.Errorf("iterate result from query: %s\nwith Error : %w", sqlStatement, err)) + } pg.logger.Infof("WH: PG: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames) delSuccess := true for _, stagingTableName := range stagingTableNames { @@ -1070,6 +1073,10 @@ func (pg *Postgres) handleExecContext(ctx context.Context, e *QueryParams) (err } response = append(response, s) } + if err = rows.Err(); err != nil { + err = fmt.Errorf("[WH][POSTGRES] Error occurred while processing destination revisionID query %+v with err: %w", e, err) + return + } pg.logger.Infof(fmt.Sprintf(`[WH][POSTGRES] Execution Query plan for statement: %s is %s`, sqlStatement, strings.Join(response, ` `))) } diff --git a/warehouse/integrations/redshift/redshift.go b/warehouse/integrations/redshift/redshift.go index 00d59dd87b..c83afe7aea 100644 --- a/warehouse/integrations/redshift/redshift.go +++ b/warehouse/integrations/redshift/redshift.go @@ -1104,10 +1104,13 @@ func (rs *Redshift) dropDanglingStagingTables(ctx context.Context) bool { var tableName string err := rows.Scan(&tableName) if err != nil { - panic(fmt.Errorf("Failed to scan result from query: %s\nwith Error : %w", sqlStatement, err)) + panic(fmt.Errorf("scan result from query: %s\nwith Error : %w", sqlStatement, err)) } stagingTableNames = append(stagingTableNames, tableName) } + if err := rows.Err(); err != nil { + panic(fmt.Errorf("iterate result from query: %s\nwith Error : %w", sqlStatement, err)) + } rs.Logger.Infof("WH: RS: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames) delSuccess := true for _, stagingTableName := range stagingTableNames { diff --git a/warehouse/integrations/snowflake/snowflake.go b/warehouse/integrations/snowflake/snowflake.go index 6de4351cc6..242c21fb49 100644 --- a/warehouse/integrations/snowflake/snowflake.go +++ b/warehouse/integrations/snowflake/snowflake.go @@ -1151,6 +1151,9 @@ func (sf *Snowflake) DownloadIdentityRules(ctx context.Context, gzWriter *misc.G csvWriter.Flush() _ = gzWriter.WriteGZ(buff.String()) } + if err = rows.Err(); err != nil { + return + } offset += batchSize if offset >= totalRows { diff --git a/warehouse/internal/repo/staging.go b/warehouse/internal/repo/staging.go index a926e3a957..ef56ef208c 100644 --- a/warehouse/internal/repo/staging.go +++ b/warehouse/internal/repo/staging.go @@ -454,6 +454,9 @@ func (repo *StagingFiles) DestinationRevisionIDs(ctx context.Context, upload mod } revisionIDs = append(revisionIDs, revisionID) } + if err = rows.Err(); err != nil { + return nil, fmt.Errorf("iterate destination revisionID: %w", err) + } return revisionIDs, nil } diff --git a/warehouse/internal/repo/upload.go b/warehouse/internal/repo/upload.go index 7e47224ae6..48f48aceea 100644 --- a/warehouse/internal/repo/upload.go +++ b/warehouse/internal/repo/upload.go @@ -343,6 +343,9 @@ func (uploads *Uploads) GetToProcess(ctx context.Context, destType string, limit } uploadJobs = append(uploadJobs, upload) } + if err := rows.Err(); err != nil { + return nil, err + } return uploadJobs, nil } diff --git a/warehouse/upload.go b/warehouse/upload.go index c04e262603..c0b688d71b 100644 --- a/warehouse/upload.go +++ b/warehouse/upload.go @@ -1834,6 +1834,9 @@ func (job *UploadJob) getLoadFilesTableMap() (loadFilesMap map[tableNameT]bool, } loadFilesMap[tableNameT(tableName)] = true } + if err = rows.Err(); err != nil { + err = fmt.Errorf("interate distinct table name query for jobId: %d, sourceId: %s, destinationId: %s, err: %w", job.upload.ID, job.warehouse.Source.ID, job.warehouse.Destination.ID, err) + } return } @@ -1921,6 +1924,9 @@ func (job *UploadJob) GetLoadFilesMetadata(ctx context.Context, options warehous Metadata: metadata, }) } + if err = rows.Err(); err != nil { + panic(fmt.Errorf("iterate query results: %s\nwith Error : %w", sqlStatement, err)) + } return }