Skip to content

Commit

Permalink
chore: upgrade golangci version and lint fixes (#3443)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach committed Jun 6, 2023
1 parent 8b179b6 commit 3d03653
Show file tree
Hide file tree
Showing 30 changed files with 132 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.51.2
version: v1.52
args: -v
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ install-tools:

.PHONY: lint
lint: fmt ## Run linters on all go files
docker run --rm -v $(shell pwd):/app:ro -w /app golangci/golangci-lint:v1.51.2 bash -e -c \
docker run --rm -v $(shell pwd):/app:ro -w /app golangci/golangci-lint:v1.52 bash -e -c \
'golangci-lint run -v --timeout 5m'

.PHONY: fmt
Expand Down
1 change: 0 additions & 1 deletion build/wait-for-go/wait-for.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func canConnect(host, port, protocol string) bool {
fmt.Println("UDP error:", err)
if err != nil {
_ = conn.Close()
conn = nil
return false
}
}
Expand Down
3 changes: 1 addition & 2 deletions jobsdb/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ Ping returns health check for pg database
func (jd *HandleT) Ping() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
rows, err := jd.dbHandle.QueryContext(ctx, `SELECT 'Rudder DB Health Check'::text as message`)
_, err := jd.dbHandle.ExecContext(ctx, `SELECT 'Rudder DB Health Check'::text as message`)
if err != nil {
return err
}
_ = rows.Close()
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions jobsdb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,9 @@ func (jd *HandleT) createTableDumps(queryFunc func(int64) string, pathFunc func(
}
offset++
}
if err := rows.Err(); err != nil {
return fmt.Errorf("error while iterating rows: %w", err)
}
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,9 @@ func TestCreateDS(t *testing.T) {
require.NoError(t, err)
tableNames = append(tableNames, tableName)
}
if err = tables.Err(); err != nil {
require.NoError(t, err)
}
require.Equal(t, len(tableNames), 2, `should find two tables`)
require.Equal(t, tableNames[0], prefix+"_jobs_-2")
require.Equal(t, tableNames[1], prefix+"_jobs_-1")
Expand Down Expand Up @@ -1401,6 +1404,7 @@ func getPayloadSize(t *testing.T, jobsDB JobsDB, job *JobT) (int64, error) {
require.NoError(t, err)
tables = append(tables, table)
}
require.NoError(t, rows.Err())
_ = rows.Close()

for _, table := range tables {
Expand Down
34 changes: 23 additions & 11 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,14 +1066,11 @@ func (jd *HandleT) refreshDSRangeList(l lock.LockToken) {
jd.assert(ds.Index != "", "ds.Index is empty")
sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %q`, ds.JobTable)
// Note: Using Query instead of QueryRow, because the sqlmock library doesn't have support for QueryRow
rows, err := jd.dbHandle.Query(sqlStatement)
row := jd.dbHandle.QueryRow(sqlStatement)

err := row.Scan(&minID, &maxID)
jd.assertError(err)
for rows.Next() {
err := rows.Scan(&minID, &maxID)
jd.assertError(err)
break
}
_ = rows.Close()

jd.logger.Debug(sqlStatement, minID, maxID)
// We store ranges EXCEPT for
// 1. the last element (which is being actively written to)
Expand Down Expand Up @@ -1261,7 +1258,7 @@ func (jd *HandleT) computeNewIdxForAppend(l lock.LockToken) string {
}

func (jd *HandleT) doComputeNewIdxForAppend(dList []dataSetT) string {
newDSIdx := ""
var newDSIdx string
if len(dList) == 0 {
newDSIdx = "1"
} else {
Expand Down Expand Up @@ -1845,6 +1842,9 @@ func (jd *HandleT) GetActiveWorkspaces(ctx context.Context, customVal string) ([
}
workspaceIds = append(workspaceIds, workspaceId)
}
if err = rows.Err(); err != nil {
return nil, err
}
return workspaceIds, nil
}

Expand Down Expand Up @@ -1886,6 +1886,9 @@ func (jd *HandleT) GetDistinctParameterValues(ctx context.Context, parameterName
}
values = append(values, value)
}
if err = rows.Err(); err != nil {
return nil, err
}
return values, nil
}

Expand Down Expand Up @@ -2105,6 +2108,9 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, params G
payloadSize = runningPayloadSize
eventCount = runningEventCount
}
if err := rows.Err(); err != nil {
return JobsResult{}, false, err
}
if !limitsReached &&
(params.JobsLimit > 0 && len(jobList) == params.JobsLimit) || // we reached the jobs limit
(params.EventsLimit > 0 && eventCount >= params.EventsLimit) || // we reached the events limit
Expand Down Expand Up @@ -2213,9 +2219,6 @@ func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, params
return JobsResult{}, false, err
}
defer func() { _ = rows.Close() }()
if err != nil {
return JobsResult{}, false, err
}
var runningEventCount int
var runningPayloadSize int64

Expand Down Expand Up @@ -2248,6 +2251,9 @@ func (jd *HandleT) getUnprocessedJobsDS(ctx context.Context, ds dataSetT, params
eventCount = runningEventCount

}
if err := rows.Err(); err != nil {
return JobsResult{}, false, err
}
if !limitsReached &&
(params.JobsLimit > 0 && len(jobList) == params.JobsLimit) || // we reached the jobs limit
(params.EventsLimit > 0 && eventCount >= params.EventsLimit) || // we reached the events limit
Expand Down Expand Up @@ -2575,6 +2581,9 @@ func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT) {
jd.assertError(err)
count++
}
if err = rows.Err(); err != nil {
jd.assertError(err)
}
return
}

Expand Down Expand Up @@ -2621,6 +2630,9 @@ func (jd *HandleT) recoverFromCrash(owner OwnerType, goRoutineType string) {
jd.assert(!opDone, "opDone is true")
count++
}
if err = rows.Err(); err != nil {
jd.assertError(err)
}
jd.assert(count <= 1, fmt.Sprintf("count:%d > 1", count))

if count == 0 {
Expand Down
3 changes: 3 additions & 0 deletions jobsdb/jobsdb_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func getAllTableNames(dbHandle sqlDbOrTx) ([]string, error) {
}
tableNames = append(tableNames, tbName)
}
if err = rows.Err(); err != nil {
return tableNames, err
}
return tableNames, nil
}

Expand Down
3 changes: 3 additions & 0 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ func (jd *HandleT) getCleanUpCandidates(ctx context.Context, dsList []dataSetT)
}
estimates[tableName] = estimate
}
if err = rows.Err(); err != nil {
return nil, err
}

datasets := lo.Filter(dsList,
func(ds dataSetT, idx int) bool {
Expand Down
3 changes: 3 additions & 0 deletions router/eventorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,9 @@ func (eventOrderMethods) countDrainedJobs(db *sql.DB) int {
tables = append(tables, table)
}
}
if err = rows.Err(); err != nil {
panic(err)
}
}
for _, table := range tables {
var dsCount int
Expand Down
2 changes: 1 addition & 1 deletion services/multitenant/tenantstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (t *Stats) GetRouterPickupJobs(destType string, noOfWorkers int, routerTime
break
}

pickUpCount := 0
var pickUpCount int
if t.routerTenantLatencyStat[destType][workspaceKey].Value() == 0 {
pickUpCount = misc.MinInt(pendingEvents-workspacePickUpCount[workspaceKey], runningJobCount)
} else {
Expand Down
4 changes: 4 additions & 0 deletions services/pgnotifier/pgnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,10 @@ func (notifier *PGNotifier) RunMaintenanceWorker(ctx context.Context) error {
}
ids = append(ids, id)
}
if err := rows.Err(); err != nil {
panic(err)
}

_ = rows.Close()
pkgLogger.Debugf("PgNotifier: Re-triggered job ids: %v", ids)

Expand Down
7 changes: 5 additions & 2 deletions services/validators/envValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func killDanglingDBConnections(db *sql.DB) error {
AND APPLICATION_NAME = CURRENT_SETTING('APPLICATION_NAME')
AND APPLICATION_NAME <> ''`)
if err != nil {
return fmt.Errorf("error occurred when querying pg_stat_activity table for terminating dangling connections: %w", err)
return fmt.Errorf("querying pg_stat_activity table for terminating dangling connections: %w", err)
}
defer func() { _ = rows.Close() }()

Expand All @@ -172,10 +172,13 @@ func killDanglingDBConnections(db *sql.DB) error {
var row danglingConnRow
err := rows.Scan(&row.pid, &row.queryStart, &row.waitEventType, &row.waitEvent, &row.state, &row.query, &row.terminated)
if err != nil {
return fmt.Errorf("error occurred when scanning pg_stat_activity table for terminating dangling connections: %w", err)
return fmt.Errorf("scanning pg_stat_activity table for terminating dangling connections: %w", err)
}
dangling = append(dangling, &row)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("iterating pg_stat_activity table for terminating dangling connections: %w", err)
}

if len(dangling) > 0 {
pkgLogger.Warnf("Terminated %d dangling connection(s)", len(dangling))
Expand Down
3 changes: 1 addition & 2 deletions utils/misc/dbutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ func ReplaceDB(dbName, targetName string) {

// Killing sessions on the db
sqlStatement := fmt.Sprintf("SELECT pid, pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '%s' AND pid <> pg_backend_pid();", dbName)
rows, err := db.Query(sqlStatement)
_, err = db.Exec(sqlStatement)
if err != nil {
panic(err)
}
rows.Close()

renameDBStatement := fmt.Sprintf(`ALTER DATABASE %q RENAME TO %q`,
dbName, targetName)
Expand Down
3 changes: 3 additions & 0 deletions utils/misc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,9 @@ func MakeRetryablePostRequest(url, endpoint string, data interface{}) (response
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, -1, err
}
defer func() { httputil.CloseResponse(resp) }()

pkgLogger.Debugf("Post request: Successful %s", string(body))
Expand Down
9 changes: 9 additions & 0 deletions warehouse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,11 @@ func (tableUploadReq TableUploadReq) GetWhTableUploads(ctx context.Context) ([]*
}
tableUploads = append(tableUploads, &tableUpload)
}
if err = rows.Err(); err != nil {
tableUploadReq.API.log.Errorf(err.Error())
return []*proto.WHTable{}, err
}

return tableUploads, nil
}

Expand Down Expand Up @@ -649,6 +654,10 @@ func (uploadsReq *UploadsReq) getUploadsFromDB(ctx context.Context, isMultiWorks
upload.Tables = make([]*proto.WHTable, 0)
uploads = append(uploads, &upload)
}
if err = rows.Err(); err != nil {
return nil, 0, err
}

return uploads, totalUploadCount, err
}

Expand Down
10 changes: 9 additions & 1 deletion warehouse/archive/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ func (a *Archiver) Do(ctx context.Context) error {
stagingFileIDs = append(stagingFileIDs, stagingFileID)
stagingFileLocations = append(stagingFileLocations, stagingFileLocation)
}
if err := stagingFileRows.Err(); err != nil {
txn.Rollback()
return fmt.Errorf("iterating staging file rows: %w", err)
}
stagingFileRows.Close()

var storedStagingFilesLocation string
Expand Down Expand Up @@ -333,7 +337,7 @@ func (a *Archiver) Do(ctx context.Context) error {
warehouseutils.WarehouseStagingFilesTable,
misc.IntArrayToString(stagingFileIDs, ","),
)
_, err = txn.QueryContext(ctx, stmt)
_, err = txn.ExecContext(ctx, stmt)
if err != nil {
a.Logger.Errorf(`Error running txn in archiveUploadFiles. Query: %s Error: %v`, stmt, err)
txn.Rollback()
Expand Down Expand Up @@ -369,6 +373,10 @@ func (a *Archiver) Do(ctx context.Context) error {
}
loadLocations = append(loadLocations, loc)
}
if err := loadLocationRows.Err(); err != nil {
txn.Rollback()
return fmt.Errorf("iterating load file location: %w", err)
}
loadLocationRows.Close()
var paths []string
for _, loc := range loadLocations {
Expand Down
4 changes: 4 additions & 0 deletions warehouse/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (cl *Client) sqlQuery(statement string) (result warehouseutils.QueryResult,
}
result.Values = append(result.Values, stringRow)
}
if err = rows.Err(); err != nil {
return result, err
}

return result, err
}

Expand Down
11 changes: 11 additions & 0 deletions warehouse/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (idr *Identity) applyRule(txn *sql.Tx, ruleID int64, gzWriter *misc.GZipWri
if err != nil {
return
}
defer tableRows.Close()

for tableRows.Next() {
var mergePropType, mergePropVal string
Expand All @@ -159,6 +160,9 @@ func (idr *Identity) applyRule(txn *sql.Tx, ruleID int64, gzWriter *misc.GZipWri
row := []string{mergePropType, mergePropVal, newID, currentTimeString}
rows = append(rows, row)
}
if err = tableRows.Err(); err != nil {
return
}

sqlStatement = fmt.Sprintf(`UPDATE %s SET rudder_id='%s', updated_at='%s' WHERE rudder_id IN (%v)`, idr.mappingsTable(), newID, currentTimeString, misc.SingleQuoteLiteralJoin(rudderIDs[1:]))
var res sql.Result
Expand Down Expand Up @@ -317,6 +321,10 @@ func (idr *Identity) addRules(txn *sql.Tx, loadFileNames []string, gzWriter *mis
}
ids = append(ids, id)
}
if err = rows.Err(); err != nil {
pkgLogger.Errorf(`IDR: Error reading rows from %s from %s: %v`, idr.mergeRulesTable(), mergeRulesStagingTable, err)
return
}
pkgLogger.Debugf(`IDR: Number of merge rules inserted for uploadID %v : %v`, idr.uploadID, len(ids))
return ids, nil
}
Expand Down Expand Up @@ -361,6 +369,9 @@ func (idr *Identity) writeTableToFile(tableName string, txn *sql.Tx, gzWriter *m
rowString, _ := eventLoader.WriteToString()
gzWriter.WriteGZ(rowString)
}
if err = rows.Err(); err != nil {
return
}

offset += batchSize
if offset >= totalRows {
Expand Down
3 changes: 3 additions & 0 deletions warehouse/integrations/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,9 @@ func (as *AzureSynapse) dropDanglingStagingTables(ctx context.Context) bool {
}
stagingTableNames = append(stagingTableNames, tableName)
}
if err := rows.Err(); err != nil {
panic(fmt.Errorf("iterating result from query: %s\nwith Error : %w", sqlStatement, err))
}
as.Logger.Infof("WH: SYNAPSE: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames)
delSuccess := true
for _, stagingTableName := range stagingTableNames {
Expand Down
1 change: 1 addition & 0 deletions warehouse/integrations/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ func TestClickhouse_LoadTableRoundTrip(t *testing.T) {
require.Fail(t, fmt.Sprintf("table %s column %s is of Nullable type even when disableNullable is set to true", tableName, columnName))
}
}
require.NoError(t, rows.Err())
}

t.Log("Loading data into table")
Expand Down
2 changes: 2 additions & 0 deletions warehouse/integrations/deltalake-native/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,8 @@ func (d *Deltalake) partitionQuery(ctx context.Context, tableName string) (strin
}
defer func() { _ = rows.Close() }()

_ = rows.Err() // ignore error

partitionColumns, err := rows.Columns()
if err != nil {
return "", fmt.Errorf("scanning partition columns: %w", err)
Expand Down

0 comments on commit 3d03653

Please sign in to comment.