Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: enabling gateway to ingest events even when sharedDB is down #4262

Merged
merged 3 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)

fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)

rsourcesService, err := NewRsourcesService(deploymentType)
rsourcesService, err := NewRsourcesService(deploymentType, true)
if err != nil {
return err
}
Expand Down
22 changes: 11 additions & 11 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
if err != nil {
return fmt.Errorf("failed to create rate limiter: %w", err)
}
rsourcesService, err := NewRsourcesService(deploymentType)
rsourcesService, err := NewRsourcesService(deploymentType, false)
if err != nil {
return err
}
Expand All @@ -132,23 +132,23 @@
})
drainConfigManager, err := drain_config.NewDrainConfigManager(config, a.log.Child("drain-config"))
if err != nil {
return fmt.Errorf("drain config manager setup: %v", err)
}
defer drainConfigManager.Stop()
g.Go(misc.WithBugsnag(func() (err error) {
return drainConfigManager.DrainConfigRoutine(ctx)
}))
g.Go(misc.WithBugsnag(func() (err error) {
return drainConfigManager.CleanupRoutine(ctx)
}))
a.log.Errorw("drain config manager setup failed while starting gateway", "error", err)
}

Check warning on line 137 in app/apphandlers/gatewayAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/gatewayAppHandler.go#L135-L137

Added lines #L135 - L137 were not covered by tests
drainConfigHttpHandler := drain_config.ErrorResponder("unable to start drain config http handler")
if drainConfigManager != nil {
defer drainConfigManager.Stop()
mihir20 marked this conversation as resolved.
Show resolved Hide resolved
drainConfigHttpHandler = drainConfigManager.DrainConfigHttpHandler()
}

err = gw.Setup(
ctx,
config, logger.NewLogger().Child("gateway"), stats.Default,
a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDB,
rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
gateway.WithInternalHttpHandlers(
map[string]http.Handler{
"/drain": drainConfigManager.DrainConfigHttpHandler(),
"/drain": drainConfigHttpHandler,
},
),
)
Expand Down
2 changes: 1 addition & 1 deletion app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options

fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)

rsourcesService, err := NewRsourcesService(deploymentType)
rsourcesService, err := NewRsourcesService(deploymentType, true)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion app/apphandlers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func rudderCoreWorkSpaceTableSetup() error {
}

// NewRsourcesService produces a rsources.JobService through environment configuration (env variables & config file)
func NewRsourcesService(deploymentType deployment.Type) (rsources.JobService, error) {
func NewRsourcesService(deploymentType deployment.Type, shouldSetupSharedDB bool) (rsources.JobService, error) {
var rsourcesConfig rsources.JobServiceConfig
rsourcesConfig.MaxPoolSize = config.GetInt("Rsources.MaxPoolSize", 3)
rsourcesConfig.MinPoolSize = config.GetInt("Rsources.MinPoolSize", 1)
Expand All @@ -82,6 +82,8 @@ func NewRsourcesService(deploymentType deployment.Type) (rsources.JobService, er
}
}

rsourcesConfig.ShouldSetupSharedDB = shouldSetupSharedDB

return rsources.NewJobService(rsourcesConfig)
}

Expand Down
11 changes: 5 additions & 6 deletions internal/drain-config/drainConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,25 @@ type drainConfigManager struct {
wg sync.WaitGroup
}

// NewDrainConfigManager returns a drainConfigManager
// If migration fails while setting up drain config, drainConfigManager object will be returned along with error
// Consumers must handle errors and non-nil drainConfigManager object according to their use case.
func NewDrainConfigManager(conf *config.Config, log logger.Logger) (*drainConfigManager, error) {
db, err := setupDBConn(conf)
if err != nil {
log.Errorw("db setup", "error", err)
return nil, fmt.Errorf("db setup: %v", err)
}
if err := migrate(db); err != nil {
if err = migrate(db); err != nil {
log.Errorw("db migrations", "error", err)
return nil, fmt.Errorf("db migrations: %v", err)
}
return &drainConfigManager{
log: log,
conf: conf,
db: db,

done: &atomic.Bool{},
}, nil
mihir20 marked this conversation as resolved.
Show resolved Hide resolved
}, err
}

func (d *drainConfigManager) CleanupRoutine(ctx context.Context) error {
Expand Down Expand Up @@ -189,9 +191,6 @@ func setupDBConn(conf *config.Config) (*sql.DB, error) {
if err != nil {
return nil, fmt.Errorf("db open: %v", err)
}
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("db ping: %v", err)
}
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
db.SetMaxIdleConns(conf.GetInt("drainConfig.maxIdleConns", 1))
db.SetMaxOpenConns(conf.GetInt("drainConfig.maxOpenConns", 2))
return db, nil
Expand Down
6 changes: 6 additions & 0 deletions internal/drain-config/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@
}
return nil
}

func ErrorResponder(errMsg string) http.Handler {
return http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, errMsg, http.StatusInternalServerError)
}))

Check warning on line 48 in internal/drain-config/http.go

View check run for this annotation

Codecov / codecov/patch

internal/drain-config/http.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}
18 changes: 10 additions & 8 deletions services/rsources/failed_records_pagination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ func TestFailedRecords(t *testing.T) {
require.NoError(t, err)

service, err := NewJobService(JobServiceConfig{
LocalHostname: postgresContainer.Host,
MaxPoolSize: 1,
LocalConn: postgresContainer.DBDsn,
Log: logger.NOP,
LocalHostname: postgresContainer.Host,
MaxPoolSize: 1,
LocalConn: postgresContainer.DBDsn,
Log: logger.NOP,
ShouldSetupSharedDB: true,
})
require.NoError(t, err)
// Create 2 different job run ids with 10 records each
Expand Down Expand Up @@ -85,10 +86,11 @@ func RunFailedRecordsPerformanceTest(t testing.TB, recordCount, pageSize int) ti
require.NoError(t, err)

service, err := NewJobService(JobServiceConfig{
LocalHostname: postgresContainer.Host,
MaxPoolSize: 1,
LocalConn: postgresContainer.DBDsn,
Log: logger.NOP,
LocalHostname: postgresContainer.Host,
MaxPoolSize: 1,
LocalConn: postgresContainer.DBDsn,
Log: logger.NOP,
ShouldSetupSharedDB: true,
})
require.NoError(t, err)

Expand Down
60 changes: 30 additions & 30 deletions services/rsources/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (sh *sourcesHandler) getStatusInternal(ctx context.Context, db *sql.DB, job
filters, filterParams := sqlFilters(jobRunId, filter)

sqlStatement := fmt.Sprintf(
`SELECT
`SELECT
source_id,
destination_id,
task_run_id,
Expand Down Expand Up @@ -96,7 +96,7 @@ func (*sourcesHandler) IncrementStats(ctx context.Context, tx *sql.Tx, jobRunId
failed_count
) values ($1, $2, $3, $4, $5, $6, $7)
on conflict(db_name, job_run_id, task_run_id, source_id, destination_id)
do update set
do update set
in_count = "rsources_stats".in_count + excluded.in_count,
out_count = "rsources_stats".out_count + excluded.out_count,
failed_count = "rsources_stats".failed_count + excluded.failed_count,
Expand All @@ -116,7 +116,7 @@ func (sh *sourcesHandler) AddFailedRecords(ctx context.Context, tx *sql.Tx, jobR
return nil
}
row := tx.QueryRow(`INSERT INTO rsources_failed_keys_v2 (id, job_run_id, task_run_id, source_id, destination_id)
VALUES ($1, $2, $3, $4, $5)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (job_run_id, task_run_id, source_id, destination_id, db_name) DO UPDATE SET ts = NOW()
RETURNING id`, ksuid.New().String(), jobRunId, key.TaskRunID, key.SourceID, key.DestinationID)
var id string
Expand Down Expand Up @@ -196,8 +196,8 @@ func (sh *sourcesHandler) GetFailedRecords(ctx context.Context, jobRunId string,
r.id,
r.record_id,
r.code
FROM "rsources_failed_keys_v2_records" r
JOIN "rsources_failed_keys_v2" k ON r.id = k.id %[1]s
FROM "rsources_failed_keys_v2_records" r
JOIN "rsources_failed_keys_v2" k ON r.id = k.id %[1]s
ORDER BY r.id, r.record_id ASC %[2]s`,
filters, limit)

Expand Down Expand Up @@ -291,8 +291,8 @@ func (sh *sourcesHandler) GetFailedRecordsV1(ctx context.Context, jobRunId strin
k.destination_id,
r.id,
r.record_id
FROM "rsources_failed_keys_v2_records" r
JOIN "rsources_failed_keys_v2" k ON r.id = k.id %[1]s
FROM "rsources_failed_keys_v2_records" r
JOIN "rsources_failed_keys_v2" k ON r.id = k.id %[1]s
ORDER BY r.id, r.record_id ASC %[2]s`,
filters, limit)

Expand Down Expand Up @@ -430,7 +430,7 @@ func (sh *sourcesHandler) doCleanupTables(ctx context.Context) error {
}
before := time.Now().Add(-config.GetDuration("Rsources.retention", defaultRetentionPeriodInHours, time.Hour))
if _, err := tx.ExecContext(ctx, `delete from "rsources_stats" where job_run_id in (
select lastUpdateToJobRunId.job_run_id from
select lastUpdateToJobRunId.job_run_id from
(select job_run_id, max(ts) as mts from "rsources_stats" group by job_run_id) lastUpdateToJobRunId
where lastUpdateToJobRunId.mts <= $1
)`, before); err != nil {
Expand All @@ -444,7 +444,7 @@ func (sh *sourcesHandler) doCleanupTables(ctx context.Context) error {
JOIN "rsources_failed_keys_v2_records" r on r.id = k.id
GROUP BY k.job_run_id
) lastUpdateToJobRunId WHERE lastUpdateToJobRunId.mts <= $1
)
)
),
deleted AS (
DELETE FROM "rsources_failed_keys_v2" WHERE id IN (SELECT id FROM to_delete) RETURNING id
Expand Down Expand Up @@ -487,7 +487,7 @@ func (sh *sourcesHandler) init() error {
return err
}

if sh.sharedDB != nil {
if sh.config.ShouldSetupSharedDB && sh.sharedDB != nil {
if err := withAdvisoryLock(ctx, sh.sharedDB, lockID, func(_ *sql.Tx) error {
sh.log.Debugf("setting up rsources tables for shared db %s", sh.config.SharedConn)
if err := setupTables(ctx, sh.sharedDB, "shared", sh.log); err != nil {
Expand Down Expand Up @@ -536,42 +536,42 @@ func migrateFailedKeysTable(ctx context.Context, tx *sql.Tx) error {
v_alphabet char array[62] := array[
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
'U', 'V', 'W', 'X', 'Y', 'Z',
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
'U', 'V', 'W', 'X', 'Y', 'Z',
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
'u', 'v', 'w', 'x', 'y', 'z'];
i integer := 0;
begin

-- Get the current time
v_time := clock_timestamp();

-- Extract epoch seconds
v_seconds := EXTRACT(EPOCH FROM v_time) - v_epoch;

-- Generate a KSUID in a numeric variable
v_numeric := v_seconds * pow(2::numeric(50), 128) -- 32 bits for seconds and 128 bits for randomness
+ ((random()::numeric(70,20) * pow(2::numeric(70,20), 48))::numeric(50) * pow(2::numeric(50), 80)::numeric(50))
+ ((random()::numeric(70,20) * pow(2::numeric(70,20), 40))::numeric(50) * pow(2::numeric(50), 40)::numeric(50))
+ (random()::numeric(70,20) * pow(2::numeric(70,20), 40))::numeric(50);

-- Encode it to base-62
while v_numeric <> 0 loop
v_base62 := v_base62 || v_alphabet[mod(v_numeric, 62) + 1];
v_numeric := div(v_numeric, 62);
end loop;
v_base62 := reverse(v_base62);
v_base62 := lpad(v_base62, 27, '0');

return v_base62;

end $$ language plpgsql;`); err != nil {
return fmt.Errorf("failed to create ksuid function: %w", err)
}

if _, err := tx.ExecContext(ctx, `WITH new_keys AS (
INSERT INTO "rsources_failed_keys_v2"
INSERT INTO "rsources_failed_keys_v2"
(id, job_run_id, task_run_id, source_id, destination_id, db_name)
SELECT ksuid(), t.* FROM (
SELECT DISTINCT job_run_id, task_run_id, source_id, destination_id, db_name from "rsources_failed_keys"
Expand All @@ -580,12 +580,12 @@ func migrateFailedKeysTable(ctx context.Context, tx *sql.Tx) error {
)
INSERT INTO "rsources_failed_keys_v2_records" (id, record_id, ts)
SELECT n.id, o.record_id::text, min(o.ts) FROM new_keys n
JOIN rsources_failed_keys o
on o.db_name = n.db_name
and o.destination_id = n.destination_id
and o.source_id = n.source_id
and o.task_run_id = n.task_run_id
and o.job_run_id = n.job_run_id
JOIN rsources_failed_keys o
on o.db_name = n.db_name
and o.destination_id = n.destination_id
and o.source_id = n.source_id
and o.task_run_id = n.task_run_id
and o.job_run_id = n.job_run_id
group by n.id, o.record_id
`); err != nil {
return fmt.Errorf("failed to migrate rsources_failed_keys table: %w", err)
Expand All @@ -605,7 +605,7 @@ func migrateFailedKeysTable(ctx context.Context, tx *sql.Tx) error {

// TODO: Remove this after a few releases
func setupFailedKeysTableV0(ctx context.Context, db *sql.DB, defaultDbName string, log logger.Logger) error {
sqlStatement := fmt.Sprintf(`create table "rsources_failed_keys" (
sqlStatement := fmt.Sprintf(`create table "rsources_failed_keys" (
id BIGSERIAL,
db_name text not null default '%s',
job_run_id text not null,
Expand All @@ -631,13 +631,13 @@ func setupFailedKeysTableV0(ctx context.Context, db *sql.DB, defaultDbName strin
}

func setupFailedKeysTable(ctx context.Context, db *sql.DB, defaultDbName string, log logger.Logger) error {
if _, err := db.ExecContext(ctx, fmt.Sprintf(`create table "rsources_failed_keys_v2" (
if _, err := db.ExecContext(ctx, fmt.Sprintf(`create table "rsources_failed_keys_v2" (
id VARCHAR(27) COLLATE "C",
db_name text not null default '%s',
job_run_id text not null,
task_run_id text not null,
source_id text not null,
destination_id text not null,
destination_id text not null,
primary key (id),
unique (job_run_id, task_run_id, source_id, destination_id, db_name)
)`, defaultDbName)); err != nil {
Expand All @@ -648,7 +648,7 @@ func setupFailedKeysTable(ctx context.Context, db *sql.DB, defaultDbName string,
}
}

if _, err := db.ExecContext(ctx, `create table "rsources_failed_keys_v2_records" (
if _, err := db.ExecContext(ctx, `create table "rsources_failed_keys_v2_records" (
id VARCHAR(27) COLLATE "C",
record_id text not null,
ts timestamp not null default NOW(),
Expand Down
Loading
Loading