Skip to content

Commit

Permalink
chore: master pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 18, 2023
2 parents 6a6edbb + ca52403 commit eef8f8a
Show file tree
Hide file tree
Showing 15 changed files with 746 additions and 549 deletions.
1 change: 1 addition & 0 deletions .github/tools/matrixchecker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var IgnorePackages = []string{
"warehouse/integrations/middleware",
"warehouse/integrations/testhelper",
"warehouse/integrations/testdata",
"warehouse/integrations/config",
}

func main() {
Expand Down
1 change: 0 additions & 1 deletion runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ func runAllInit() {
warehouseutils.Init()
pgnotifier.Init()
jobsdb.Init()
warehouse.Init()
warehouse.Init4()
validations.Init()
webhook.Init()
Expand Down
33 changes: 33 additions & 0 deletions warehouse/integrations/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package config

import (
"github.com/rudderlabs/rudder-go-kit/config"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func MaxParallelLoadsMap(conf *config.Config) map[string]int {
return map[string]int{
whutils.BQ: conf.GetInt("Warehouse.bigquery.maxParallelLoads", 20),
whutils.RS: conf.GetInt("Warehouse.redshift.maxParallelLoads", 8),
whutils.POSTGRES: conf.GetInt("Warehouse.postgres.maxParallelLoads", 8),
whutils.MSSQL: conf.GetInt("Warehouse.mssql.maxParallelLoads", 8),
whutils.SNOWFLAKE: conf.GetInt("Warehouse.snowflake.maxParallelLoads", 8),
whutils.CLICKHOUSE: conf.GetInt("Warehouse.clickhouse.maxParallelLoads", 8),
whutils.DELTALAKE: conf.GetInt("Warehouse.deltalake.maxParallelLoads", 8),
whutils.S3Datalake: conf.GetInt("Warehouse.s3_datalake.maxParallelLoads", 8),
whutils.GCSDatalake: conf.GetInt("Warehouse.gcs_datalake.maxParallelLoads", 8),
whutils.AzureDatalake: conf.GetInt("Warehouse.azure_datalake.maxParallelLoads", 8),
}
}

func ColumnCountLimitMap(conf *config.Config) map[string]int {
return map[string]int{
whutils.AzureSynapse: conf.GetInt("Warehouse.azure_synapse.columnCountLimit", 1024),
whutils.BQ: conf.GetInt("Warehouse.bigquery.columnCountLimit", 10000),
whutils.CLICKHOUSE: conf.GetInt("Warehouse.clickhouse.columnCountLimit", 1000),
whutils.MSSQL: conf.GetInt("Warehouse.mssql.columnCountLimit", 1024),
whutils.POSTGRES: conf.GetInt("Warehouse.postgres.columnCountLimit", 1600),
whutils.RS: conf.GetInt("Warehouse.redshift.columnCountLimit", 1600),
whutils.S3Datalake: conf.GetInt("Warehouse.s3_datalake.columnCountLimit", 10000),
}
}
12 changes: 8 additions & 4 deletions warehouse/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/rudderlabs/rudder-server/warehouse/encoding"

"github.com/rudderlabs/rudder-server/app"

"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"

"golang.org/x/exp/slices"
Expand Down Expand Up @@ -105,6 +107,7 @@ type router struct {

func newRouter(
ctx context.Context,
app app.App,
destType string,
conf *config.Config,
logger logger.Logger,
Expand Down Expand Up @@ -146,7 +149,10 @@ func newRouter(
r.inProgressMap = make(map[WorkerIdentifierT][]JobID)

r.uploadJobFactory = UploadJobFactory{
stats: r.statsFactory,
app: app,
conf: r.conf,
logger: r.logger,
statsFactory: r.statsFactory,
dbHandle: r.dbHandle,
pgNotifier: r.notifier,
destinationValidator: validations.NewDestinationValidator(),
Expand Down Expand Up @@ -479,9 +485,7 @@ func (r *router) uploadsToProcess(ctx context.Context, availableWorkers int, ski
Warehouse: warehouse,
Upload: upload,
StagingFiles: stagingFilesList,
},
whManager,
)
}, whManager)

uploadJobs = append(uploadJobs, uploadJob)
}
Expand Down
53 changes: 33 additions & 20 deletions warehouse/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (

func TestRouter(t *testing.T) {
pgnotifier.Init()
Init()
Init4()

pool, err := dockertest.NewPool("")
Expand All @@ -55,9 +54,6 @@ func TestRouter(t *testing.T) {
Reporting: report,
}).AnyTimes()

application = mockApp
pkgLogger = logger.NOP

sourceID := "test-source-id"
destinationID := "test-destination-id"
namespace := "test-namespace"
Expand Down Expand Up @@ -97,9 +93,8 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

notifier, err = pgnotifier.New(workspaceIdentifier, pgResource.DBDsn)
notifier, err := pgnotifier.New(workspaceIdentifier, pgResource.DBDsn)
require.NoError(t, err)

tenantManager := &multitenant.Manager{
Expand All @@ -121,7 +116,20 @@ func TestRouter(t *testing.T) {

em := encoding.NewManager(config.Default)

r, err := newRouter(ctx, destinationType, config.Default, logger.NOP, stats.Default, db, &notifier, tenantManager, cp, bcm, em)
r, err := newRouter(
ctx,
mockApp,
destinationType,
config.Default,
logger.NOP,
stats.Default,
db,
&notifier,
tenantManager,
cp,
bcm,
em,
)
require.NoError(t, err)

cancel()
Expand All @@ -141,7 +149,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down Expand Up @@ -177,6 +184,7 @@ func TestRouter(t *testing.T) {
r.uploadRepo = repoUpload
r.stagingRepo = repoStaging
r.statsFactory = memstats.New()
r.conf = config.Default
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.enableJitterForSyncs = true
Expand Down Expand Up @@ -294,7 +302,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down Expand Up @@ -333,6 +340,7 @@ func TestRouter(t *testing.T) {
r.warehouseDBHandle = NewWarehouseDB(db)
r.uploadRepo = repoUpload
r.stagingRepo = repoStaging
r.conf = config.Default
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.enableJitterForSyncs = true
Expand Down Expand Up @@ -399,7 +407,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down Expand Up @@ -447,8 +454,11 @@ func TestRouter(t *testing.T) {
}
r.warehouses = []model.Warehouse{warehouse}
r.uploadJobFactory = UploadJobFactory{
stats: r.statsFactory,
dbHandle: r.dbHandle,
app: mockApp,
conf: config.Default,
logger: logger.NOP,
statsFactory: r.statsFactory,
dbHandle: r.dbHandle,
}
r.stats.processingPendingJobsStat = r.statsFactory.NewTaggedStat("wh_processing_pending_jobs", stats.GaugeType, stats.Tags{
"destType": r.destType,
Expand Down Expand Up @@ -529,7 +539,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down Expand Up @@ -583,8 +592,11 @@ func TestRouter(t *testing.T) {
r.bcManager = newBackendConfigManager(r.conf, r.dbHandle, r.tenantManager, r.logger)
r.warehouses = []model.Warehouse{warehouse}
r.uploadJobFactory = UploadJobFactory{
stats: r.statsFactory,
dbHandle: r.dbHandle,
app: mockApp,
conf: config.Default,
logger: logger.NOP,
statsFactory: r.statsFactory,
dbHandle: r.dbHandle,
}
r.workerChannelMap = map[string]chan *UploadJob{
r.workerIdentifier(warehouse): make(chan *UploadJob, 1),
Expand Down Expand Up @@ -680,7 +692,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down Expand Up @@ -731,8 +742,11 @@ func TestRouter(t *testing.T) {
r.bcManager = newBackendConfigManager(r.conf, r.dbHandle, r.tenantManager, r.logger)
r.warehouses = []model.Warehouse{warehouse}
r.uploadJobFactory = UploadJobFactory{
stats: r.statsFactory,
dbHandle: r.dbHandle,
app: mockApp,
conf: config.Default,
logger: logger.NOP,
statsFactory: r.statsFactory,
dbHandle: r.dbHandle,
}
r.workerChannelMap = map[string]chan *UploadJob{
r.workerIdentifier(warehouse): make(chan *UploadJob, 1),
Expand Down Expand Up @@ -786,7 +800,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand All @@ -804,6 +817,7 @@ func TestRouter(t *testing.T) {
r.warehouseDBHandle = NewWarehouseDB(db)
r.uploadRepo = repoUpload
r.stagingRepo = repoStaging
r.conf = config.Default
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.enableJitterForSyncs = true
Expand Down Expand Up @@ -942,7 +956,6 @@ func TestRouter(t *testing.T) {
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)
wrappedDBHandle = db

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
Expand Down
3 changes: 3 additions & 0 deletions warehouse/slave_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/services/pgnotifier"
"github.com/rudderlabs/rudder-server/warehouse/encoding"
integrationsconfig "github.com/rudderlabs/rudder-server/warehouse/integrations/config"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/jobs"
Expand Down Expand Up @@ -237,6 +238,8 @@ func (sw *slaveWorker) processStagingFile(ctx context.Context, job payload) ([]u
bufScanner := bufio.NewScanner(jr.stagingFileReader)
bufScanner.Buffer(make([]byte, maxCapacity), maxCapacity)

columnCountLimitMap := integrationsconfig.ColumnCountLimitMap(jr.conf)

for {
ok := bufScanner.Scan()
if !ok {
Expand Down
2 changes: 2 additions & 0 deletions warehouse/slave_worker_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type jobRun struct {
now func() time.Time

stats stats.Stats
conf *config.Config
uploadTimeStat stats.Measurement
totalUploadTimeStat stats.Measurement
downloadStagingFileStat stats.Measurement
Expand All @@ -127,6 +128,7 @@ func newJobRun(job payload, conf *config.Config, log logger.Logger, stat stats.S
job: job,
identifier: warehouseutils.GetWarehouseIdentifier(job.DestinationType, job.SourceID, job.DestinationID),
stats: stat,
conf: conf,
since: time.Since,
logger: log,
now: timeutil.Now,
Expand Down
9 changes: 4 additions & 5 deletions warehouse/slave_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,11 @@ func TestSlaveWorker(t *testing.T) {
subscribeCh: subscribeCh,
}

c := config.New()
c.Set("Warehouse.s3_datalake.columnCountLimit", 10)

slaveWorker := newSlaveWorker(
config.Default,
c,
logger.NOP,
stats.Default,
notifier,
Expand All @@ -315,10 +318,6 @@ func TestSlaveWorker(t *testing.T) {
workerIdx,
)

columnCountLimitMap = map[string]int{
warehouseutils.S3Datalake: 10,
}

p := payload{
UploadID: 1,
StagingFileID: 1,
Expand Down

0 comments on commit eef8f8a

Please sign in to comment.