Skip to content

Commit

Permalink
fix: batchrouter async dest pending event counts aren't being decreas…
Browse files Browse the repository at this point in the history
…ed (#4346)
  • Loading branch information
Sidddddarth committed Jan 31, 2024
1 parent e1789a5 commit 5d78ab6
Show file tree
Hide file tree
Showing 9 changed files with 7 additions and 10 deletions.
2 changes: 0 additions & 2 deletions cmd/rudder-cli/util/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func testFileCreation() (string, error) {
}

err = os.WriteFile(testFilePath, data, 0o644)

if err != nil {
return "Could not write to temp file", err
}
Expand Down Expand Up @@ -69,7 +68,6 @@ func TestUpload() (string, error) {
Key: aws.String(splitFileName[len(splitFileName)-1]),
Body: fileToUpload,
})

if err != nil {
return "Failed to upload to S3. Check Credentials.", err
}
Expand Down
1 change: 0 additions & 1 deletion gateway/webhook/webhookTransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ func (bt *batchWebhookTransformerT) transform(events [][]byte, sourceTransformer
*/
var responses []transformerResponse
err = json.Unmarshal(respBody, &responses)

if err != nil {
statusCode := response.GetErrorStatusCode(response.SourceTransformerInvalidResponseFormat)
err := errors.New(response.GetStatus(response.SourceTransformerInvalidResponseFormat))
Expand Down
2 changes: 0 additions & 2 deletions jobsdb/jobsdb_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,6 @@ func (*Handle) copyJobsDSInTx(txHandler transactionHandler, ds dataSetT, jobList

stmt, err = txHandler.Prepare(pq.CopyIn(ds.JobTable, "job_id", "uuid", "user_id", "custom_val", "parameters",
"event_payload", "event_count", "created_at", "expire_at", "workspace_id"))

if err != nil {
return err
}
Expand All @@ -697,7 +696,6 @@ func (*Handle) copyJobsDSInTx(txHandler transactionHandler, ds dataSetT, jobList

_, err = stmt.Exec(job.JobID, job.UUID, job.UserID, job.CustomVal, string(job.Parameters),
string(job.EventPayload), eventCount, job.CreatedAt, job.ExpireAt, job.WorkspaceId)

if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions router/batchrouter/handle_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/rudderlabs/rudder-server/router/rterror"
"github.com/rudderlabs/rudder-server/router/types"
routerutils "github.com/rudderlabs/rudder-server/router/utils"
"github.com/rudderlabs/rudder-server/services/rmetrics"
"github.com/rudderlabs/rudder-server/utils/misc"
utilTypes "github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -696,6 +697,12 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
panic(err)
}
brt.updateProcessedEventsMetrics(statusList)
rmetrics.DecreasePendingEvents(
"batch_rt",
workspaceID,
brt.destType,
float64(len(completedJobsList)),
)

if attempted {
var sourceID string
Expand Down
1 change: 0 additions & 1 deletion suppression-backup-service/cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func getSuppressions(w http.ResponseWriter, r *http.Request) {
if pt == tokenKey {
w.WriteHeader(http.StatusOK)
body, err = json.Marshal(suppressionsResponse{Token: tokenKey})

if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
1 change: 0 additions & 1 deletion utils/misc/dbutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func ReplaceDB(dbName, targetName string, c *config.Config) {
dbName, targetName)
pkgLogger.Debug(renameDBStatement)
_, err = db.Exec(renameDBStatement)

// If execution of ALTER returns error, pacicking
if err != nil {
panic(err)
Expand Down
1 change: 0 additions & 1 deletion warehouse/integrations/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,6 @@ func (as *AzureSynapse) loadUserTables(ctx context.Context) (errorMap map[string
sqlStatement = fmt.Sprintf(`INSERT INTO "%[1]s"."%[2]s" (%[4]s) SELECT %[4]s FROM %[3]s`, as.Namespace, warehouseutils.UsersTable, as.Namespace+"."+stagingTableName, strings.Join(append([]string{"id"}, userColNames...), ","))
as.logger.Infof("AZ: Inserting records for table:%s using staging table: %s\n", warehouseutils.UsersTable, sqlStatement)
_, err = tx.ExecContext(ctx, sqlStatement)

if err != nil {
as.logger.Errorf("AZ: Error inserting into users table from staging table: %v\n", err)
_ = tx.Rollback()
Expand Down
1 change: 0 additions & 1 deletion warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,6 @@ func (d *Deltalake) LoadUserTables(ctx context.Context) map[string]error {
)

_, err = d.DB.ExecContext(ctx, query)

if err != nil {
return map[string]error{
warehouseutils.IdentifiesTable: nil,
Expand Down
1 change: 0 additions & 1 deletion warehouse/integrations/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,6 @@ func (ms *MSSQL) loadUserTables(ctx context.Context) (errorMap map[string]error)
sqlStatement = fmt.Sprintf(`INSERT INTO "%[1]s"."%[2]s" (%[4]s) SELECT %[4]s FROM %[3]s`, ms.Namespace, warehouseutils.UsersTable, ms.Namespace+"."+stagingTableName, strings.Join(append([]string{"id"}, userColNames...), ","))
ms.logger.Infof("MSSQL: Inserting records for table:%s using staging table: %s\n", warehouseutils.UsersTable, sqlStatement)
_, err = tx.ExecContext(ctx, sqlStatement)

if err != nil {
ms.logger.Errorf("MSSQL: Error inserting into users table from staging table: %v\n", err)
_ = tx.Rollback()
Expand Down

0 comments on commit 5d78ab6

Please sign in to comment.