Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: sources async job #4008

Merged
merged 18 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions warehouse/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
"github.com/rudderlabs/rudder-server/warehouse/jobs"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
"github.com/rudderlabs/rudder-server/warehouse/source"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand Down Expand Up @@ -75,7 +75,7 @@ type Api struct {
bcConfig backendconfig.BackendConfig
tenantManager *multitenant.Manager
bcManager *bcm.BackendConfigManager
asyncManager *jobs.AsyncJobWh
sourceManager *source.Manager
stagingRepo *repo.StagingFiles
uploadRepo *repo.Uploads
schemaRepo *repo.WHSchema
Expand All @@ -100,7 +100,7 @@ func NewApi(
notifier *notifier.Notifier,
tenantManager *multitenant.Manager,
bcManager *bcm.BackendConfigManager,
asyncManager *jobs.AsyncJobWh,
sourceManager *source.Manager,
triggerStore *sync.Map,
) *Api {
a := &Api{
Expand All @@ -112,7 +112,7 @@ func NewApi(
statsFactory: statsFactory,
tenantManager: tenantManager,
bcManager: bcManager,
asyncManager: asyncManager,
sourceManager: sourceManager,
triggerStore: triggerStore,
stagingRepo: repo.NewStagingFiles(db),
uploadRepo: repo.NewUploads(db),
Expand Down Expand Up @@ -170,8 +170,8 @@ func (a *Api) addMasterEndpoints(ctx context.Context, r chi.Router) {
r.Post("/pending-events", a.logMiddleware(a.pendingEventsHandler))
r.Post("/trigger-upload", a.logMiddleware(a.triggerUploadHandler))

r.Post("/jobs", a.logMiddleware(a.asyncManager.InsertJobHandler)) // TODO: add degraded mode
r.Get("/jobs/status", a.logMiddleware(a.asyncManager.StatusJobHandler)) // TODO: add degraded mode
r.Post("/jobs", a.logMiddleware(a.sourceManager.InsertJobHandler)) // TODO: add degraded mode
r.Get("/jobs/status", a.logMiddleware(a.sourceManager.StatusJobHandler)) // TODO: add degraded mode

r.Get("/fetch-tables", a.logMiddleware(a.fetchTablesHandler)) // TODO: Remove this endpoint once sources change is released
})
Expand Down
13 changes: 7 additions & 6 deletions warehouse/api/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (

kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/jobs"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
"github.com/rudderlabs/rudder-server/warehouse/source"

"github.com/golang/mock/gomock"
"github.com/ory/dockertest/v3"
Expand Down Expand Up @@ -188,12 +188,12 @@ func TestHTTPApi(t *testing.T) {
err = n.Setup(ctx, pgResource.DBDsn)
require.NoError(t, err)

sourcesManager := jobs.New(
ctx,
sourcesManager := source.New(
c,
logger.NOP,
db,
n,
)
jobs.WithConfig(sourcesManager, config.New())

g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
Expand All @@ -205,7 +205,7 @@ func TestHTTPApi(t *testing.T) {
return nil
})
g.Go(func() error {
return sourcesManager.Run()
return sourcesManager.Run(gCtx)
})

setupCh := make(chan struct{})
Expand Down Expand Up @@ -906,7 +906,8 @@ func TestHTTPApi(t *testing.T) {
"source_id": "test_source_id",
"destination_id": "test_destination_id",
"job_run_id": "test_source_job_run_id",
"task_run_id": "test_source_task_run_id"
"task_run_id": "test_source_task_run_id",
"async_job_type": "deletebyjobrunid"
}
`)))
require.NoError(t, err)
Expand Down
12 changes: 6 additions & 6 deletions warehouse/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
"github.com/rudderlabs/rudder-server/utils/types"
whadmin "github.com/rudderlabs/rudder-server/warehouse/admin"
"github.com/rudderlabs/rudder-server/warehouse/archive"
"github.com/rudderlabs/rudder-server/warehouse/jobs"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
"github.com/rudderlabs/rudder-server/warehouse/source"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand All @@ -67,7 +67,7 @@ type App struct {
constraintsManager *constraints.Manager
encodingFactory *encoding.Factory
fileManagerFactory filemanager.Factory
sourcesManager *jobs.AsyncJobWh
sourcesManager *source.Manager
admin *whadmin.Admin
triggerStore *sync.Map
createUploadAlways *atomic.Bool
Expand Down Expand Up @@ -174,12 +174,12 @@ func (a *App) Setup(ctx context.Context) error {
return fmt.Errorf("cannot setup notifier: %w", err)
}

a.sourcesManager = jobs.New(
ctx,
a.sourcesManager = source.New(
a.conf,
a.logger,
a.db,
a.notifier,
)
jobs.WithConfig(a.sourcesManager, a.conf)

a.grpcServer, err = api.NewGRPCServer(
a.conf,
Expand Down Expand Up @@ -413,7 +413,7 @@ func (a *App) Run(ctx context.Context) error {
return nil
})
g.Go(misc.WithBugsnagForWarehouse(func() error {
return a.sourcesManager.Run()
return a.sourcesManager.Run(gCtx)
}))
}

Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap whth.EventsCountMap
tableUploadsEventsMap whth.EventsCountMap
warehouseEventsMap whth.EventsCountMap
asyncJob bool
sourceJob bool
skipModifiedEvents bool
prerequisite func(context.Context, testing.TB, *bigquery.Client)
enableMerge bool
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job-merge-mode",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
sourceID: sourcesSourceID,
destinationID: sourcesDestinationID,
Expand All @@ -192,7 +192,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
enableMerge: false,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -359,7 +359,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: whth.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/mssql/mssql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap testhelper.EventsCountMap
tableUploadsEventsMap testhelper.EventsCountMap
warehouseEventsMap testhelper.EventsCountMap
asyncJob bool
sourceJob bool
stagingFilePrefix string
}{
{
Expand All @@ -174,7 +174,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
Expand All @@ -184,7 +184,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: testhelper.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: testhelper.SourcesTableUploadsEventsMap(),
warehouseEventsMap: testhelper.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
stagingFilePrefix: "testdata/sources-job",
},
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -257,7 +257,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: testhelper.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap whth.EventsCountMap
tableUploadsEventsMap whth.EventsCountMap
warehouseEventsMap whth.EventsCountMap
asyncJob bool
sourceJob bool
stagingFilePrefix string
}{
{
Expand All @@ -204,7 +204,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
Expand All @@ -214,7 +214,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
stagingFilePrefix: "testdata/sources-job",
},
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -287,7 +287,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: whth.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/redshift/redshift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap whth.EventsCountMap
tableUploadsEventsMap whth.EventsCountMap
warehouseEventsMap whth.EventsCountMap
asyncJob bool
sourceJob bool
stagingFilePrefix string
}{
{
Expand All @@ -198,7 +198,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
Expand All @@ -208,7 +208,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
stagingFilePrefix: "testdata/sources-job",
},
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -292,7 +292,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-1.json",
UserID: whth.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestIntegration(t *testing.T) {
warehouseEventsMap2 testhelper.EventsCountMap
cred *testCredentials
database string
asyncJob bool
sourceJob bool
stagingFilePrefix string
emptyJobRunID bool
enableMerge bool
Expand Down Expand Up @@ -291,7 +291,7 @@ func TestIntegration(t *testing.T) {
enableMerge: true,
},
{
name: "Async Job with Sources",
name: "Source Job with Sources",
writeKey: sourcesWriteKey,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
Expand All @@ -308,7 +308,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: testhelper.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: testhelper.SourcesTableUploadsEventsMap(),
warehouseEventsMap: testhelper.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
stagingFilePrefix: "testdata/sources-job",
enableMerge: true,
},
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: whEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -450,7 +450,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: userID,
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
14 changes: 7 additions & 7 deletions warehouse/integrations/testhelper/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
)

const (
WaitFor2Minute = 2 * time.Minute
WaitFor10Minute = 10 * time.Minute
DefaultQueryFrequency = 100 * time.Millisecond
AsyncJOBQueryFrequency = 1000 * time.Millisecond
WaitFor2Minute = 2 * time.Minute
WaitFor10Minute = 10 * time.Minute
DefaultQueryFrequency = 100 * time.Millisecond
SourceJobQueryFrequency = 1000 * time.Millisecond
)

const (
Expand Down Expand Up @@ -64,7 +64,7 @@ type TestConfig struct {
TableUploadsEventsMap EventsCountMap
WarehouseEventsMap EventsCountMap
JobsDB *sql.DB
AsyncJob bool
SourceJob bool
SkipWarehouse bool
HTTPPort int
}
Expand All @@ -80,8 +80,8 @@ func (w *TestConfig) VerifyEvents(t testing.TB) {
verifyEventsInLoadFiles(t, w)
verifyEventsInTableUploads(t, w)

if w.AsyncJob {
verifyAsyncJob(t, w)
if w.SourceJob {
verifySourceJob(t, w)
}
if !w.SkipWarehouse {
verifyEventsInWareHouse(t, w)
Expand Down
Loading
Loading