Skip to content

Commit

Permalink
chore: uploads cleanup (#3743)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 18, 2023
1 parent ff80a1d commit ca52403
Show file tree
Hide file tree
Showing 15 changed files with 734 additions and 549 deletions.
1 change: 1 addition & 0 deletions .github/tools/matrixchecker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var IgnorePackages = []string{
"warehouse/integrations/middleware",
"warehouse/integrations/testhelper",
"warehouse/integrations/testdata",
"warehouse/integrations/config",
}

func main() {
Expand Down
1 change: 0 additions & 1 deletion runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ func runAllInit() {
encoding.Init()
pgnotifier.Init()
jobsdb.Init()
warehouse.Init()
warehouse.Init4()
validations.Init()
webhook.Init()
Expand Down
33 changes: 33 additions & 0 deletions warehouse/integrations/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package config

import (
"github.com/rudderlabs/rudder-go-kit/config"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func MaxParallelLoadsMap(conf *config.Config) map[string]int {
return map[string]int{
whutils.BQ: conf.GetInt("Warehouse.bigquery.maxParallelLoads", 20),
whutils.RS: conf.GetInt("Warehouse.redshift.maxParallelLoads", 8),
whutils.POSTGRES: conf.GetInt("Warehouse.postgres.maxParallelLoads", 8),
whutils.MSSQL: conf.GetInt("Warehouse.mssql.maxParallelLoads", 8),
whutils.SNOWFLAKE: conf.GetInt("Warehouse.snowflake.maxParallelLoads", 8),
whutils.CLICKHOUSE: conf.GetInt("Warehouse.clickhouse.maxParallelLoads", 8),
whutils.DELTALAKE: conf.GetInt("Warehouse.deltalake.maxParallelLoads", 8),
whutils.S3Datalake: conf.GetInt("Warehouse.s3_datalake.maxParallelLoads", 8),
whutils.GCSDatalake: conf.GetInt("Warehouse.gcs_datalake.maxParallelLoads", 8),
whutils.AzureDatalake: conf.GetInt("Warehouse.azure_datalake.maxParallelLoads", 8),
}
}

func ColumnCountLimitMap(conf *config.Config) map[string]int {
return map[string]int{
whutils.AzureSynapse: conf.GetInt("Warehouse.azure_synapse.columnCountLimit", 1024),
whutils.BQ: conf.GetInt("Warehouse.bigquery.columnCountLimit", 10000),
whutils.CLICKHOUSE: conf.GetInt("Warehouse.clickhouse.columnCountLimit", 1000),
whutils.MSSQL: conf.GetInt("Warehouse.mssql.columnCountLimit", 1024),
whutils.POSTGRES: conf.GetInt("Warehouse.postgres.columnCountLimit", 1600),
whutils.RS: conf.GetInt("Warehouse.redshift.columnCountLimit", 1600),
whutils.S3Datalake: conf.GetInt("Warehouse.s3_datalake.columnCountLimit", 10000),
}
}
12 changes: 8 additions & 4 deletions warehouse/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync/atomic"
"time"

"github.com/rudderlabs/rudder-server/app"

"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"

"golang.org/x/exp/slices"
Expand Down Expand Up @@ -103,6 +105,7 @@ type router struct {

func newRouter(
ctx context.Context,
app app.App,
destType string,
conf *config.Config,
logger logger.Logger,
Expand Down Expand Up @@ -143,7 +146,10 @@ func newRouter(
r.inProgressMap = make(map[WorkerIdentifierT][]JobID)

r.uploadJobFactory = UploadJobFactory{
stats: r.statsFactory,
app: app,
conf: r.conf,
logger: r.logger,
statsFactory: r.statsFactory,
dbHandle: r.dbHandle,
pgNotifier: r.notifier,
destinationValidator: validations.NewDestinationValidator(),
Expand Down Expand Up @@ -475,9 +481,7 @@ func (r *router) uploadsToProcess(ctx context.Context, availableWorkers int, ski
Warehouse: warehouse,
Upload: upload,
StagingFiles: stagingFilesList,
},
whManager,
)
}, whManager)

uploadJobs = append(uploadJobs, uploadJob)
}
Expand Down
39 changes: 20 additions & 19 deletions warehouse/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (

func TestRouter(t *testing.T) {
pgnotifier.Init()
Init()
Init4()

pool, err := dockertest.NewPool("")
Expand All @@ -53,9 +52,6 @@ func TestRouter(t *testing.T) {
Reporting: report,
}).AnyTimes()

application = mockApp
pkgLogger = logger.NOP

sourceID := "test-source-id"
destinationID := "test-destination-id"
namespace := "test-namespace"
Expand Down Expand Up @@ -95,9 +91,8 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

notifier, err = pgnotifier.New(workspaceIdentifier, pgResource.DBDsn)
notifier, err := pgnotifier.New(workspaceIdentifier, pgResource.DBDsn)
require.NoError(t, err)

tenantManager := &multitenant.Manager{
Expand All @@ -119,6 +114,7 @@ func TestRouter(t *testing.T) {

r, err := newRouter(
ctx,
mockApp,
destinationType,
config.Default,
logger.NOP,
Expand Down Expand Up @@ -148,7 +144,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down Expand Up @@ -184,6 +179,7 @@ func TestRouter(t *testing.T) {
r.uploadRepo = repoUpload
r.stagingRepo = repoStaging
r.statsFactory = memstats.New()
r.conf = config.Default
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.enableJitterForSyncs = true
Expand Down Expand Up @@ -301,7 +297,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down Expand Up @@ -340,6 +335,7 @@ func TestRouter(t *testing.T) {
r.warehouseDBHandle = NewWarehouseDB(db)
r.uploadRepo = repoUpload
r.stagingRepo = repoStaging
r.conf = config.Default
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.enableJitterForSyncs = true
Expand Down Expand Up @@ -406,7 +402,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down Expand Up @@ -454,8 +449,11 @@ func TestRouter(t *testing.T) {
}
r.warehouses = []model.Warehouse{warehouse}
r.uploadJobFactory = UploadJobFactory{
stats: r.statsFactory,
dbHandle: r.dbHandle,
app: mockApp,
conf: config.Default,
logger: logger.NOP,
statsFactory: r.statsFactory,
dbHandle: r.dbHandle,
}
r.stats.processingPendingJobsStat = r.statsFactory.NewTaggedStat("wh_processing_pending_jobs", stats.GaugeType, stats.Tags{
"destType": r.destType,
Expand Down Expand Up @@ -536,7 +534,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down Expand Up @@ -590,8 +587,11 @@ func TestRouter(t *testing.T) {
r.bcManager = newBackendConfigManager(r.conf, r.dbHandle, r.tenantManager, r.logger)
r.warehouses = []model.Warehouse{warehouse}
r.uploadJobFactory = UploadJobFactory{
stats: r.statsFactory,
dbHandle: r.dbHandle,
app: mockApp,
conf: config.Default,
logger: logger.NOP,
statsFactory: r.statsFactory,
dbHandle: r.dbHandle,
}
r.workerChannelMap = map[string]chan *UploadJob{
r.workerIdentifier(warehouse): make(chan *UploadJob, 1),
Expand Down Expand Up @@ -687,7 +687,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down Expand Up @@ -738,8 +737,11 @@ func TestRouter(t *testing.T) {
r.bcManager = newBackendConfigManager(r.conf, r.dbHandle, r.tenantManager, r.logger)
r.warehouses = []model.Warehouse{warehouse}
r.uploadJobFactory = UploadJobFactory{
stats: r.statsFactory,
dbHandle: r.dbHandle,
app: mockApp,
conf: config.Default,
logger: logger.NOP,
statsFactory: r.statsFactory,
dbHandle: r.dbHandle,
}
r.workerChannelMap = map[string]chan *UploadJob{
r.workerIdentifier(warehouse): make(chan *UploadJob, 1),
Expand Down Expand Up @@ -793,7 +795,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand All @@ -811,6 +812,7 @@ func TestRouter(t *testing.T) {
r.warehouseDBHandle = NewWarehouseDB(db)
r.uploadRepo = repoUpload
r.stagingRepo = repoStaging
r.conf = config.Default
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.enableJitterForSyncs = true
Expand Down Expand Up @@ -949,7 +951,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down
3 changes: 3 additions & 0 deletions warehouse/slave_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/services/pgnotifier"
"github.com/rudderlabs/rudder-server/warehouse/encoding"
integrationsconfig "github.com/rudderlabs/rudder-server/warehouse/integrations/config"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/jobs"
Expand Down Expand Up @@ -234,6 +235,8 @@ func (sw *slaveWorker) processStagingFile(ctx context.Context, job payload) ([]u
bufScanner := bufio.NewScanner(jr.stagingFileReader)
bufScanner.Buffer(make([]byte, maxCapacity), maxCapacity)

columnCountLimitMap := integrationsconfig.ColumnCountLimitMap(jr.conf)

for {
ok := bufScanner.Scan()
if !ok {
Expand Down
2 changes: 2 additions & 0 deletions warehouse/slave_worker_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type jobRun struct {
now func() time.Time

stats stats.Stats
conf *config.Config
uploadTimeStat stats.Measurement
totalUploadTimeStat stats.Measurement
downloadStagingFileStat stats.Measurement
Expand All @@ -126,6 +127,7 @@ func newJobRun(job payload, conf *config.Config, log logger.Logger, stat stats.S
job: job,
identifier: warehouseutils.GetWarehouseIdentifier(job.DestinationType, job.SourceID, job.DestinationID),
stats: stat,
conf: conf,
since: time.Since,
logger: log,
now: timeutil.Now,
Expand Down
9 changes: 4 additions & 5 deletions warehouse/slave_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,11 @@ func TestSlaveWorker(t *testing.T) {
subscribeCh: subscribeCh,
}

c := config.New()
c.Set("Warehouse.s3_datalake.columnCountLimit", 10)

slaveWorker := newSlaveWorker(
config.Default,
c,
logger.NOP,
stats.Default,
notifier,
Expand All @@ -309,10 +312,6 @@ func TestSlaveWorker(t *testing.T) {
workerIdx,
)

columnCountLimitMap = map[string]int{
warehouseutils.S3Datalake: 10,
}

p := payload{
UploadID: 1,
StagingFileID: 1,
Expand Down

0 comments on commit ca52403

Please sign in to comment.