Skip to content

Commit

Permalink
chore: master pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 23, 2023
2 parents cc34f10 + a04142e commit 11a7617
Show file tree
Hide file tree
Showing 35 changed files with 480 additions and 467 deletions.
2 changes: 1 addition & 1 deletion app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
reporting := a.app.Features().Reporting.Setup(backendconfig.DefaultBackendConfig)

g.Go(func() error {
reporting.AddClient(ctx, types.Config{ConnInfo: misc.GetConnectionString()})
reporting.AddClient(ctx, types.Config{ConnInfo: misc.GetConnectionString(config)})
return nil
})

Expand Down
2 changes: 1 addition & 1 deletion app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
reporting := a.app.Features().Reporting.Setup(backendconfig.DefaultBackendConfig)

g.Go(misc.WithBugsnag(func() error {
reporting.AddClient(ctx, types.Config{ConnInfo: misc.GetConnectionString()})
reporting.AddClient(ctx, types.Config{ConnInfo: misc.GetConnectionString(config.Default)})
return nil
}))

Expand Down
2 changes: 1 addition & 1 deletion app/apphandlers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func rudderCoreWorkSpaceTableSetup() error {
func NewRsourcesService(deploymentType deployment.Type) (rsources.JobService, error) {
var rsourcesConfig rsources.JobServiceConfig
rsourcesConfig.MaxPoolSize = config.GetInt("Rsources.PoolSize", 5)
rsourcesConfig.LocalConn = misc.GetConnectionString()
rsourcesConfig.LocalConn = misc.GetConnectionString(config.Default)
rsourcesConfig.LocalHostname = config.GetString("DB.host", "localhost")
rsourcesConfig.SharedConn = config.GetString("SharedDB.dsn", "")
rsourcesConfig.SkipFailedRecordsCollection = !config.GetBool("Router.failedKeysEnabled", true)
Expand Down
1 change: 0 additions & 1 deletion app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ func initJobsDB() {
logger.Reset()
stash.Init()
admin.Init()
jobsdb.Init()
Init()
}

Expand Down
2 changes: 0 additions & 2 deletions archiver/archiver_isolation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ func insertJobs(
configMap map[string]backendconfig.ConfigT,
numJobsPerSource int,
) (map[string][]*jobsdb.JobT, int) {
jobsdb.Init()
t.Log(misc.GetConnectionString())
gwJobsDB := jobsdb.NewForWrite("gw")
require.NoError(t, gwJobsDB.Start(), "it should be able to start the jobsdb")
defer gwJobsDB.Stop()
Expand Down
52 changes: 14 additions & 38 deletions archiver/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,16 @@ import (
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/config"
c "github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
"github.com/rudderlabs/rudder-server/utils/misc"

"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/testhelper"
"github.com/rudderlabs/rudder-server/testhelper/destination"
)

Expand All @@ -47,46 +44,29 @@ func TestJobsArchival(t *testing.T) {

pool, err := dockertest.NewPool("")
require.NoError(t, err, "Failed to create docker pool")
cleanup := &testhelper.Cleanup{}

postgresResource, err := resource.SetupPostgres(pool, cleanup)
postgresResource, err := resource.SetupPostgres(pool, t)
require.NoError(t, err, "failed to setup postgres resource")
c := config.New()
c.Set("DB.name", postgresResource.Database)
c.Set("DB.host", postgresResource.Host)
c.Set("DB.port", postgresResource.Port)
c.Set("DB.user", postgresResource.User)
c.Set("DB.password", postgresResource.Password)
misc.Init()

jd := jobsdb.NewForReadWrite("archiver", jobsdb.WithClearDB(false), jobsdb.WithConfig(c))
require.NoError(t, jd.Start())

minioResource = make([]*destination.MINIOResource, uniqueWorkspaces)
for i := 0; i < uniqueWorkspaces; i++ {
minioResource[i], err = destination.SetupMINIO(pool, cleanup)
minioResource[i], err = destination.SetupMINIO(pool, t)
require.NoError(t, err, "failed to setup minio resource")
}

jobs, err := readGzipJobFile(seedJobsFileName)
require.NoError(t, err, "failed to read jobs file")

config.Reset()
{
t.Setenv("MINIO_SSL", "false")
t.Setenv("JOBS_DB_DB_NAME", postgresResource.Database)
t.Setenv("JOBS_DB_NAME", postgresResource.Database)
t.Setenv("JOBS_DB_HOST", postgresResource.Host)
t.Setenv("JOBS_DB_PORT", postgresResource.Port)
t.Setenv("JOBS_DB_USER", postgresResource.User)
t.Setenv("JOBS_DB_PASSWORD", postgresResource.Password)
}
jobsdb.Init()
misc.Init()
jd := &jobsdb.Handle{
TriggerAddNewDS: func() <-chan time.Time {
return make(chan time.Time)
},
}
require.NoError(t, jd.Setup(
jobsdb.ReadWrite,
false,
"gw",
[]prebackup.Handler{},
nil,
))
require.NoError(t, jd.Start())

require.NoError(t, jd.Store(ctx, jobs))

storageSettings := map[string]fileuploader.StorageSettings{
Expand Down Expand Up @@ -151,7 +131,7 @@ func TestJobsArchival(t *testing.T) {
archiver := New(
jd,
fileUploaderProvider,
c.New(),
c,
stats.Default,
WithArchiveTrigger(
func() <-chan time.Time {
Expand Down Expand Up @@ -193,25 +173,21 @@ func TestJobsArchival(t *testing.T) {
require.Equal(t, sourcesPerWorkspace[i], len(files))

for j, file := range files {
downloadFile, err := os.CreateTemp("", fmt.Sprintf("backedupfile%d%d", i, j))
downloadFile, err := os.CreateTemp(t.TempDir(), fmt.Sprintf("backedupfile%d%d", i, j))
require.NoError(t, err)
err = fm.Download(context.Background(), downloadFile, file)
require.NoError(t, err, file)
err = downloadFile.Close()
require.NoError(t, err)
dJobs, err := readGzipJobFile(downloadFile.Name())
require.NoError(t, err)
cleanup.Cleanup(func() {
_ = os.Remove(downloadFile.Name())
})
downloadedJobs = append(downloadedJobs, dJobs...)
}
}
require.Equal(t, len(jobs), len(downloadedJobs))
archiver.Stop()
jd.Stop()
jd.Close()
cleanup.Run()
}

func readGzipJobFile(filename string) ([]*jobsdb.JobT, error) {
Expand Down
2 changes: 1 addition & 1 deletion backend-config/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (db *cacheStore) Get(ctx context.Context) ([]byte, error) {

// setupDBConn sets up the database connection, creates the config table if it doesn't exist
func setupDBConn() (*sql.DB, error) {
psqlInfo := misc.GetConnectionString()
psqlInfo := misc.GetConnectionString(config.Default)
db, err := sql.Open("postgres", psqlInfo)
if err != nil {
pkgLogger.Errorf("failed to open db: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion event-schema/event_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ func (manager *EventSchemaManagerT) reloadSchemaVersion(offloadedVersion *Offloa

// TODO: Move this into some DB manager
func createDBConnection() *sql.DB {
psqlInfo := misc.GetConnectionString()
psqlInfo := misc.GetConnectionString(config.Default)
var err error
dbHandle, err := sql.Open("postgres", psqlInfo)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions event-schema/event_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/jobsdb"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/testhelper"
)
Expand Down Expand Up @@ -74,7 +73,6 @@ func jobsDBInit(es envSetter, pgResource *resource.PostgresResource) {
es.Setenv("JOBS_DB_PORT", pgResource.Port)

admin.Init()
jobsdb.Init()
}

var _ = Describe("Event Schemas", Ordered, func() {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/allisson/go-pglock/v2 v2.0.1
github.com/apache/pulsar-client-go v0.11.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/aws/aws-sdk-go v1.44.327
github.com/aws/aws-sdk-go v1.44.328
github.com/bugsnag/bugsnag-go/v2 v2.2.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.2.1
Expand All @@ -49,7 +49,7 @@ require (
github.com/golang-migrate/migrate/v4 v4.16.2
github.com/golang/mock v1.6.0
github.com/gomodule/redigo v1.8.9
github.com/google/uuid v1.3.0
github.com/google/uuid v1.3.1
github.com/hashicorp/go-retryablehttp v0.7.4
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/hashicorp/yamux v0.1.1
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,8 @@ github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZve
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.43.31/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.327 h1:ZS8oO4+7MOBLhkdwIhgtVeDzCeWOlTfKJS7EgggbIEY=
github.com/aws/aws-sdk-go v1.44.327/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go v1.44.328 h1:WBwlf8ym9SDQ/GTIBO9eXyvwappKJyOetWJKl4mT7ZU=
github.com/aws/aws-sdk-go v1.44.328/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
github.com/aws/aws-sdk-go-v2 v1.17.7 h1:CLSjnhJSTSogvqUGhIC6LqFKATMRexcxLZ0i/Nzk9Eg=
github.com/aws/aws-sdk-go-v2 v1.17.7/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
Expand Down Expand Up @@ -1260,8 +1260,9 @@ github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/wire v0.5.0/go.mod h1:ngWDr9Qvq3yZA10YrxfyGELY/AFWGVpy9c1LTRi1EoU=
github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99a87aa/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8=
github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8=
Expand Down
9 changes: 4 additions & 5 deletions jobsdb/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-server/utils/misc"
)

Expand Down Expand Up @@ -150,9 +149,9 @@ func (jd *Handle) startCleanupLoop(ctx context.Context) {
case <-jd.TriggerJobCleanUp():
func() {
for {
if err := jd.doCleanup(ctx, config.GetInt("jobsdb.cleanupBatchSize", 100)); err != nil && ctx.Err() == nil {
if err := jd.doCleanup(ctx, jd.config.GetInt("jobsdb.cleanupBatchSize", 100)); err != nil && ctx.Err() == nil {
jd.logger.Errorf("error while cleaning up old jobs: %w", err)
if err := misc.SleepCtx(ctx, config.GetDuration("jobsdb.cleanupRetryInterval", 10, time.Second)); err != nil {
if err := misc.SleepCtx(ctx, jd.config.GetDuration("jobsdb.cleanupRetryInterval", 10, time.Second)); err != nil {
return
}
continue
Expand Down Expand Up @@ -182,7 +181,7 @@ func (jd *Handle) doCleanup(ctx context.Context, batchSize int) error {
jobs := lo.Filter(
jobsResult.Jobs,
func(job *JobT, _ int) bool {
return job.CreatedAt.Before(time.Now().Add(-jd.JobMaxAge()))
return job.CreatedAt.Before(time.Now().Add(-jd.conf.jobMaxAge()))
},
)
if len(jobs) > 0 {
Expand Down Expand Up @@ -225,7 +224,7 @@ func (jd *Handle) doCleanup(ctx context.Context, batchSize int) error {
fmt.Sprintf(
deleteStmt,
jd.tablePrefix,
config.GetInt("JobsDB.archivalTimeInDays", 10),
jd.config.GetInt("JobsDB.archivalTimeInDays", 10),
),
).Scan(&journalEntryCount); err != nil {
return err
Expand Down
Loading

0 comments on commit 11a7617

Please sign in to comment.