Skip to content

Commit

Permalink
fix(warehouse): infinites retries (#3050)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Mar 10, 2023
1 parent 63dc940 commit d3320fa
Show file tree
Hide file tree
Showing 17 changed files with 1,490 additions and 796 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.22 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.22 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.30.3 // indirect
github.com/aws/smithy-go v1.13.5
github.com/aws/smithy-go v1.13.5 // indirect
github.com/bugsnag/panicwrap v1.3.4 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions warehouse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,6 @@ func (uploadReq *UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploads
return
}

var uploadJobT UploadJobT

upload, err := repo.NewUploads(uploadReq.API.dbHandle).Get(context.TODO(), uploadReq.UploadId)
if err == model.ErrUploadNotFound {
return &proto.TriggerWhUploadsResponse{
Expand All @@ -398,8 +396,12 @@ func (uploadReq *UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploads
return
}

uploadJobT.upload = upload
uploadJobT.dbHandle = uploadReq.API.dbHandle
uploadJobT := UploadJobT{
upload: upload,
dbHandle: uploadReq.API.dbHandle,
Now: timeutil.Now,
}

err = uploadJobT.triggerUploadNow()
if err != nil {
return
Expand Down
16 changes: 15 additions & 1 deletion warehouse/identities.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package warehouse

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sync"

"github.com/rudderlabs/rudder-server/warehouse/logfield"

"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
Expand Down Expand Up @@ -410,7 +413,18 @@ func (wh *HandleT) populateHistoricIdentities(warehouse warehouseutils.Warehouse
Warehouse: warehouse,
}, whManager)

tableUploadsCreated := areTableUploadsCreated(job.upload.ID)
tableUploadsCreated, tableUploadsErr := job.tableUploadsRepo.ExistsForUploadID(context.TODO(), job.upload.ID)
if tableUploadsErr != nil {
pkgLogger.Warnw("table uploads exists",
logfield.UploadJobID, job.upload.ID,
logfield.SourceID, job.upload.SourceID,
logfield.DestinationID, job.upload.DestinationID,
logfield.DestinationType, job.upload.DestinationType,
logfield.WorkspaceID, job.upload.WorkspaceID,
logfield.Error, tableUploadsErr.Error(),
)
return
}
if !tableUploadsCreated {
err := job.initTableUploads()
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions warehouse/internal/model/table_upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package model

import (
"time"
)

type TableUpload struct {
ID int64
UploadID int64
TableName string
Status string
Error string
LastExecTime time.Time
TotalEvents int64
CreatedAt time.Time
UpdatedAt time.Time
Location string
}

const (
TableUploadWaiting = "waiting"
TableUploadExecuting = "executing"
TableUploadUpdatingSchema = "updating_schema"
TableUploadUpdatingSchemaFailed = "updating_schema_failed"
TableUploadUpdatedSchema = "updated_schema"
TableUploadExporting = "exporting_data"
TableUploadExportingFailed = "exporting_data_failed"
TableUploadExported = "exported_data"
)
9 changes: 9 additions & 0 deletions warehouse/internal/model/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ type UploadJob struct {
LoadFileGenStartTime time.Time
}

type PendingTableUpload struct {
UploadID int64
DestinationID string
Namespace string
TableName string
Status string
Error string
}

type Matcher interface {
MatchString(string) bool
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/internal/repo/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (repo *LoadFiles) DeleteByStagingFiles(ctx context.Context, stagingFileIDs
// Insert loadFiles into the database.
func (repo *LoadFiles) Insert(ctx context.Context, loadFiles []model.LoadFile) (err error) {
// Using transactions for bulk copying
txn, err := repo.db.Begin()
txn, err := repo.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return
}
Expand Down

0 comments on commit d3320fa

Please sign in to comment.