Skip to content

Commit

Permalink
refactor: sources async job (#4008)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Nov 1, 2023
1 parent b80d273 commit a9199ae
Show file tree
Hide file tree
Showing 32 changed files with 2,640 additions and 1,315 deletions.
12 changes: 6 additions & 6 deletions warehouse/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
"github.com/rudderlabs/rudder-server/warehouse/jobs"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
"github.com/rudderlabs/rudder-server/warehouse/source"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand Down Expand Up @@ -75,7 +75,7 @@ type Api struct {
bcConfig backendconfig.BackendConfig
tenantManager *multitenant.Manager
bcManager *bcm.BackendConfigManager
asyncManager *jobs.AsyncJobWh
sourceManager *source.Manager
stagingRepo *repo.StagingFiles
uploadRepo *repo.Uploads
schemaRepo *repo.WHSchema
Expand All @@ -100,7 +100,7 @@ func NewApi(
notifier *notifier.Notifier,
tenantManager *multitenant.Manager,
bcManager *bcm.BackendConfigManager,
asyncManager *jobs.AsyncJobWh,
sourceManager *source.Manager,
triggerStore *sync.Map,
) *Api {
a := &Api{
Expand All @@ -112,7 +112,7 @@ func NewApi(
statsFactory: statsFactory,
tenantManager: tenantManager,
bcManager: bcManager,
asyncManager: asyncManager,
sourceManager: sourceManager,
triggerStore: triggerStore,
stagingRepo: repo.NewStagingFiles(db),
uploadRepo: repo.NewUploads(db),
Expand Down Expand Up @@ -170,8 +170,8 @@ func (a *Api) addMasterEndpoints(ctx context.Context, r chi.Router) {
r.Post("/pending-events", a.logMiddleware(a.pendingEventsHandler))
r.Post("/trigger-upload", a.logMiddleware(a.triggerUploadHandler))

r.Post("/jobs", a.logMiddleware(a.asyncManager.InsertJobHandler)) // TODO: add degraded mode
r.Get("/jobs/status", a.logMiddleware(a.asyncManager.StatusJobHandler)) // TODO: add degraded mode
r.Post("/jobs", a.logMiddleware(a.sourceManager.InsertJobHandler)) // TODO: add degraded mode
r.Get("/jobs/status", a.logMiddleware(a.sourceManager.StatusJobHandler)) // TODO: add degraded mode

r.Get("/fetch-tables", a.logMiddleware(a.fetchTablesHandler)) // TODO: Remove this endpoint once sources change is released
})
Expand Down
13 changes: 7 additions & 6 deletions warehouse/api/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (

kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/jobs"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
"github.com/rudderlabs/rudder-server/warehouse/source"

"github.com/golang/mock/gomock"
"github.com/ory/dockertest/v3"
Expand Down Expand Up @@ -188,12 +188,12 @@ func TestHTTPApi(t *testing.T) {
err = n.Setup(ctx, pgResource.DBDsn)
require.NoError(t, err)

sourcesManager := jobs.New(
ctx,
sourcesManager := source.New(
c,
logger.NOP,
db,
n,
)
jobs.WithConfig(sourcesManager, config.New())

g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
Expand All @@ -205,7 +205,7 @@ func TestHTTPApi(t *testing.T) {
return nil
})
g.Go(func() error {
return sourcesManager.Run()
return sourcesManager.Run(gCtx)
})

setupCh := make(chan struct{})
Expand Down Expand Up @@ -906,7 +906,8 @@ func TestHTTPApi(t *testing.T) {
"source_id": "test_source_id",
"destination_id": "test_destination_id",
"job_run_id": "test_source_job_run_id",
"task_run_id": "test_source_task_run_id"
"task_run_id": "test_source_task_run_id",
"async_job_type": "deletebyjobrunid"
}
`)))
require.NoError(t, err)
Expand Down
12 changes: 6 additions & 6 deletions warehouse/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
"github.com/rudderlabs/rudder-server/utils/types"
whadmin "github.com/rudderlabs/rudder-server/warehouse/admin"
"github.com/rudderlabs/rudder-server/warehouse/archive"
"github.com/rudderlabs/rudder-server/warehouse/jobs"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
"github.com/rudderlabs/rudder-server/warehouse/source"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand All @@ -67,7 +67,7 @@ type App struct {
constraintsManager *constraints.Manager
encodingFactory *encoding.Factory
fileManagerFactory filemanager.Factory
sourcesManager *jobs.AsyncJobWh
sourcesManager *source.Manager
admin *whadmin.Admin
triggerStore *sync.Map
createUploadAlways *atomic.Bool
Expand Down Expand Up @@ -174,12 +174,12 @@ func (a *App) Setup(ctx context.Context) error {
return fmt.Errorf("cannot setup notifier: %w", err)
}

a.sourcesManager = jobs.New(
ctx,
a.sourcesManager = source.New(
a.conf,
a.logger,
a.db,
a.notifier,
)
jobs.WithConfig(a.sourcesManager, a.conf)

a.grpcServer, err = api.NewGRPCServer(
a.conf,
Expand Down Expand Up @@ -413,7 +413,7 @@ func (a *App) Run(ctx context.Context) error {
return nil
})
g.Go(misc.WithBugsnagForWarehouse(func() error {
return a.sourcesManager.Run()
return a.sourcesManager.Run(gCtx)
}))
}

Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap whth.EventsCountMap
tableUploadsEventsMap whth.EventsCountMap
warehouseEventsMap whth.EventsCountMap
asyncJob bool
sourceJob bool
skipModifiedEvents bool
prerequisite func(context.Context, testing.TB, *bigquery.Client)
enableMerge bool
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job-merge-mode",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
sourceID: sourcesSourceID,
destinationID: sourcesDestinationID,
Expand All @@ -192,7 +192,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
enableMerge: false,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -359,7 +359,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: whth.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/mssql/mssql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap testhelper.EventsCountMap
tableUploadsEventsMap testhelper.EventsCountMap
warehouseEventsMap testhelper.EventsCountMap
asyncJob bool
sourceJob bool
stagingFilePrefix string
}{
{
Expand All @@ -174,7 +174,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
Expand All @@ -184,7 +184,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: testhelper.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: testhelper.SourcesTableUploadsEventsMap(),
warehouseEventsMap: testhelper.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
stagingFilePrefix: "testdata/sources-job",
},
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -257,7 +257,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: testhelper.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap whth.EventsCountMap
tableUploadsEventsMap whth.EventsCountMap
warehouseEventsMap whth.EventsCountMap
asyncJob bool
sourceJob bool
stagingFilePrefix string
}{
{
Expand All @@ -204,7 +204,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
Expand All @@ -214,7 +214,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
stagingFilePrefix: "testdata/sources-job",
},
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -287,7 +287,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: whth.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/redshift/redshift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap whth.EventsCountMap
tableUploadsEventsMap whth.EventsCountMap
warehouseEventsMap whth.EventsCountMap
asyncJob bool
sourceJob bool
stagingFilePrefix string
}{
{
Expand All @@ -198,7 +198,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
Expand All @@ -208,7 +208,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
stagingFilePrefix: "testdata/sources-job",
},
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -292,7 +292,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-1.json",
UserID: whth.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestIntegration(t *testing.T) {
warehouseEventsMap2 testhelper.EventsCountMap
cred *testCredentials
database string
asyncJob bool
sourceJob bool
stagingFilePrefix string
emptyJobRunID bool
enableMerge bool
Expand Down Expand Up @@ -291,7 +291,7 @@ func TestIntegration(t *testing.T) {
enableMerge: true,
},
{
name: "Async Job with Sources",
name: "Source Job with Sources",
writeKey: sourcesWriteKey,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
Expand All @@ -308,7 +308,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: testhelper.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: testhelper.SourcesTableUploadsEventsMap(),
warehouseEventsMap: testhelper.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
stagingFilePrefix: "testdata/sources-job",
enableMerge: true,
},
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: whEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -450,7 +450,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: userID,
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
14 changes: 7 additions & 7 deletions warehouse/integrations/testhelper/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
)

const (
WaitFor2Minute = 2 * time.Minute
WaitFor10Minute = 10 * time.Minute
DefaultQueryFrequency = 100 * time.Millisecond
AsyncJOBQueryFrequency = 1000 * time.Millisecond
WaitFor2Minute = 2 * time.Minute
WaitFor10Minute = 10 * time.Minute
DefaultQueryFrequency = 100 * time.Millisecond
SourceJobQueryFrequency = 1000 * time.Millisecond
)

const (
Expand Down Expand Up @@ -64,7 +64,7 @@ type TestConfig struct {
TableUploadsEventsMap EventsCountMap
WarehouseEventsMap EventsCountMap
JobsDB *sql.DB
AsyncJob bool
SourceJob bool
SkipWarehouse bool
HTTPPort int
}
Expand All @@ -80,8 +80,8 @@ func (w *TestConfig) VerifyEvents(t testing.TB) {
verifyEventsInLoadFiles(t, w)
verifyEventsInTableUploads(t, w)

if w.AsyncJob {
verifyAsyncJob(t, w)
if w.SourceJob {
verifySourceJob(t, w)
}
if !w.SkipWarehouse {
verifyEventsInWareHouse(t, w)
Expand Down
Loading

0 comments on commit a9199ae

Please sign in to comment.