Skip to content

Commit

Permalink
chore: master pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 28, 2023
2 parents 92ab4c9 + 94df125 commit 000dec6
Show file tree
Hide file tree
Showing 149 changed files with 5,859 additions and 3,883 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## [1.12.2](https://github.com/rudderlabs/rudder-server/compare/v1.12.1...v1.12.2) (2023-08-22)


### Bug Fixes

* extend sql middleware for mssql, azure_synapse and clickhouse ([#3771](https://github.com/rudderlabs/rudder-server/issues/3771)) ([e8809bb](https://github.com/rudderlabs/rudder-server/commit/e8809bbbf123940548a3112529a7dbc0ca0125d7))


### Miscellaneous

* deltalake error message length ([#3766](https://github.com/rudderlabs/rudder-server/issues/3766)) ([fa09173](https://github.com/rudderlabs/rudder-server/commit/fa091738b2460c58fc8e575d381e74093ee46ba6))

## [1.12.1](https://github.com/rudderlabs/rudder-server/compare/v1.12.0...v1.12.1) (2023-08-16)


Expand Down
4 changes: 2 additions & 2 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
reporting := a.app.Features().Reporting.Setup(backendconfig.DefaultBackendConfig)

g.Go(func() error {
reporting.AddClient(ctx, types.Config{ConnInfo: misc.GetConnectionString()})
reporting.AddClient(ctx, types.Config{ConnInfo: misc.GetConnectionString(config)})
return nil
})

Expand Down Expand Up @@ -326,7 +326,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
return gw.StartWebHandler(ctx)
})
if a.config.enableReplay {
var replayDB jobsdb.HandleT
var replayDB jobsdb.Handle
err := replayDB.Setup(
jobsdb.ReadWrite, options.ClearDB, "replay",
prebackupHandlers, fileUploaderProvider,
Expand Down
4 changes: 2 additions & 2 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
reporting := a.app.Features().Reporting.Setup(backendconfig.DefaultBackendConfig)

g.Go(misc.WithBugsnag(func() error {
reporting.AddClient(ctx, types.Config{ConnInfo: misc.GetConnectionString()})
reporting.AddClient(ctx, types.Config{ConnInfo: misc.GetConnectionString(config.Default)})
return nil
}))

Expand Down Expand Up @@ -320,7 +320,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
return g.Wait()
}

func (a *processorApp) startHealthWebHandler(ctx context.Context, db *jobsdb.HandleT) error {
func (a *processorApp) startHealthWebHandler(ctx context.Context, db *jobsdb.Handle) error {
// Port where Processor health handler is running
a.log.Infof("Starting in %d", a.config.http.webPort)
srvMux := chi.NewMux()
Expand Down
2 changes: 1 addition & 1 deletion app/apphandlers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func rudderCoreWorkSpaceTableSetup() error {
func NewRsourcesService(deploymentType deployment.Type) (rsources.JobService, error) {
var rsourcesConfig rsources.JobServiceConfig
rsourcesConfig.MaxPoolSize = config.GetInt("Rsources.PoolSize", 5)
rsourcesConfig.LocalConn = misc.GetConnectionString()
rsourcesConfig.LocalConn = misc.GetConnectionString(config.Default)
rsourcesConfig.LocalHostname = config.GetString("DB.host", "localhost")
rsourcesConfig.SharedConn = config.GetString("SharedDB.dsn", "")
rsourcesConfig.SkipFailedRecordsCollection = !config.GetBool("Router.failedKeysEnabled", true)
Expand Down
1 change: 0 additions & 1 deletion app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ func initJobsDB() {
logger.Reset()
stash.Init()
admin.Init()
jobsdb.Init()
Init()
}

Expand Down
2 changes: 1 addition & 1 deletion app/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Replay Feature

// ReplayFeature handles inserting of failed jobs into respective gw/rt jobsdb
type ReplayFeature interface {
Setup(ctx context.Context, replayDB, gwDB, routerDB, batchRouterDB *jobsdb.HandleT)
Setup(ctx context.Context, replayDB, gwDB, routerDB, batchRouterDB *jobsdb.Handle)
}

// ReplayFeatureSetup is a function that initializes a Replay feature
Expand Down
2 changes: 1 addition & 1 deletion archiver/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (a *archiver) Start() error {
w.config.uploadFrequency = a.config.uploadFrequency
w.config.jobsdbMaxRetries = a.config.jobsdbMaxRetries

queryParams := &jobsdb.GetQueryParamsT{
queryParams := &jobsdb.GetQueryParams{
ParameterFilters: []jobsdb.ParameterFilterT{{Name: "source_id", Value: sourceID}},
}
w.queryParams = *queryParams
Expand Down
2 changes: 0 additions & 2 deletions archiver/archiver_isolation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ func insertJobs(
configMap map[string]backendconfig.ConfigT,
numJobsPerSource int,
) (map[string][]*jobsdb.JobT, int) {
jobsdb.Init()
t.Log(misc.GetConnectionString())
gwJobsDB := jobsdb.NewForWrite("gw")
require.NoError(t, gwJobsDB.Start(), "it should be able to start the jobsdb")
defer gwJobsDB.Stop()
Expand Down
65 changes: 20 additions & 45 deletions archiver/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@ import (
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

c "github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
"github.com/rudderlabs/rudder-server/utils/misc"

"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/testhelper"
"github.com/rudderlabs/rudder-server/testhelper/destination"
)

Expand All @@ -46,46 +44,29 @@ func TestJobsArchival(t *testing.T) {

pool, err := dockertest.NewPool("")
require.NoError(t, err, "Failed to create docker pool")
cleanup := &testhelper.Cleanup{}

postgresResource, err := resource.SetupPostgres(pool, cleanup)
postgresResource, err := resource.SetupPostgres(pool, t)
require.NoError(t, err, "failed to setup postgres resource")
c := config.New()
c.Set("DB.name", postgresResource.Database)
c.Set("DB.host", postgresResource.Host)
c.Set("DB.port", postgresResource.Port)
c.Set("DB.user", postgresResource.User)
c.Set("DB.password", postgresResource.Password)
misc.Init()

jd := jobsdb.NewForReadWrite("archiver", jobsdb.WithClearDB(false), jobsdb.WithConfig(c))
require.NoError(t, jd.Start())

minioResource = make([]*destination.MINIOResource, uniqueWorkspaces)
for i := 0; i < uniqueWorkspaces; i++ {
minioResource[i], err = destination.SetupMINIO(pool, cleanup)
minioResource[i], err = destination.SetupMINIO(pool, t)
require.NoError(t, err, "failed to setup minio resource")
}

jobs, err := readGzipJobFile(seedJobsFileName)
require.NoError(t, err, "failed to read jobs file")

{
t.Setenv("MINIO_SSL", "false")
t.Setenv("JOBS_DB_DB_NAME", postgresResource.Database)
t.Setenv("JOBS_DB_NAME", postgresResource.Database)
t.Setenv("JOBS_DB_HOST", postgresResource.Host)
t.Setenv("JOBS_DB_PORT", postgresResource.Port)
t.Setenv("JOBS_DB_USER", postgresResource.User)
t.Setenv("JOBS_DB_PASSWORD", postgresResource.Password)
}

jobsdb.Init()
misc.Init()
jd := &jobsdb.HandleT{
TriggerAddNewDS: func() <-chan time.Time {
return make(chan time.Time)
},
}
require.NoError(t, jd.Setup(
jobsdb.ReadWrite,
false,
"gw",
[]prebackup.Handler{},
nil,
))
require.NoError(t, jd.Start())

require.NoError(t, jd.Store(ctx, jobs))

storageSettings := map[string]fileuploader.StorageSettings{
Expand Down Expand Up @@ -150,7 +131,7 @@ func TestJobsArchival(t *testing.T) {
archiver := New(
jd,
fileUploaderProvider,
c.New(),
c,
stats.Default,
WithArchiveTrigger(
func() <-chan time.Time {
Expand All @@ -166,14 +147,12 @@ func TestJobsArchival(t *testing.T) {
require.Eventually(
t,
func() bool {
succeeded, err := jd.GetProcessed(
succeeded, err := jd.GetSucceeded(
ctx,
jobsdb.GetQueryParamsT{
IgnoreCustomValFiltersInQuery: true,
StateFilters: []string{jobsdb.Succeeded.State},
JobsLimit: 1000,
EventsLimit: 1000,
PayloadSizeLimit: bytesize.GB,
jobsdb.GetQueryParams{
JobsLimit: 1000,
EventsLimit: 1000,
PayloadSizeLimit: bytesize.GB,
},
)
require.NoError(t, err)
Expand All @@ -194,25 +173,21 @@ func TestJobsArchival(t *testing.T) {
require.Equal(t, sourcesPerWorkspace[i], len(files))

for j, file := range files {
downloadFile, err := os.CreateTemp("", fmt.Sprintf("backedupfile%d%d", i, j))
downloadFile, err := os.CreateTemp(t.TempDir(), fmt.Sprintf("backedupfile%d%d", i, j))
require.NoError(t, err)
err = fm.Download(context.Background(), downloadFile, file)
require.NoError(t, err, file)
err = downloadFile.Close()
require.NoError(t, err)
dJobs, err := readGzipJobFile(downloadFile.Name())
require.NoError(t, err)
cleanup.Cleanup(func() {
_ = os.Remove(downloadFile.Name())
})
downloadedJobs = append(downloadedJobs, dJobs...)
}
}
require.Equal(t, len(jobs), len(downloadedJobs))
archiver.Stop()
jd.Stop()
jd.Close()
cleanup.Run()
}

func readGzipJobFile(filename string) ([]*jobsdb.JobT, error) {
Expand Down
2 changes: 1 addition & 1 deletion archiver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type worker struct {
uploadFrequency time.Duration
}
lastUploadTime time.Time
queryParams jobsdb.GetQueryParamsT
queryParams jobsdb.GetQueryParams
}

func (w *worker) Work() bool {
Expand Down
2 changes: 1 addition & 1 deletion backend-config/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (db *cacheStore) Get(ctx context.Context) ([]byte, error) {

// setupDBConn sets up the database connection, creates the config table if it doesn't exist
func setupDBConn() (*sql.DB, error) {
psqlInfo := misc.GetConnectionString()
psqlInfo := misc.GetConnectionString(config.Default)
db, err := sql.Open("postgres", psqlInfo)
if err != nil {
pkgLogger.Errorf("failed to open db: %v", err)
Expand Down
4 changes: 4 additions & 0 deletions backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type SourceT struct {
EventSchemasEnabled bool
}

func (s *SourceT) IsReplaySource() bool {
return s.OriginalID != ""
}

type WorkspaceRegulationT struct {
ID string
RegulationType string
Expand Down
6 changes: 3 additions & 3 deletions enterprise/replay/dumpsloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// DumpsLoaderHandleT - dumps-loader handle
type dumpsLoaderHandleT struct {
log logger.Logger
dbHandle *jobsdb.HandleT
dbHandle *jobsdb.Handle
prefix string
bucket string
startAfterKey string
Expand Down Expand Up @@ -80,7 +80,7 @@ type OrderedJobs struct {
Job *jobsdb.JobT
}

func storeJobs(ctx context.Context, objects []OrderedJobs, dbHandle *jobsdb.HandleT, log logger.Logger) {
func storeJobs(ctx context.Context, objects []OrderedJobs, dbHandle *jobsdb.Handle, log logger.Logger) {
// sorting dumps list on index
sort.Slice(objects, func(i, j int) bool {
return objects[i].SortIndex < objects[j].SortIndex
Expand Down Expand Up @@ -228,7 +228,7 @@ func (handle *dumpsLoaderHandleT) handleRecovery() {
}

// Setup sets up dumps-loader.
func (handle *dumpsLoaderHandleT) Setup(ctx context.Context, db *jobsdb.HandleT, tablePrefix string, uploader filemanager.FileManager, bucket string, log logger.Logger) {
func (handle *dumpsLoaderHandleT) Setup(ctx context.Context, db *jobsdb.Handle, tablePrefix string, uploader filemanager.FileManager, bucket string, log logger.Logger) {
var err error
handle.log = log
handle.dbHandle = db
Expand Down
28 changes: 10 additions & 18 deletions enterprise/replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
type Handler struct {
log logger.Logger
bucket string
db *jobsdb.HandleT
toDB *jobsdb.HandleT
db *jobsdb.Handle
toDB *jobsdb.Handle
noOfWorkers int
workers []*SourceWorkerT
dumpsLoader *dumpsLoaderHandleT
Expand All @@ -39,33 +39,25 @@ func (handle *Handler) generatorLoop(ctx context.Context) {
case <-handle.initSourceWorkersChannel:
}
for {
queryParams := jobsdb.GetQueryParamsT{
queryParams := jobsdb.GetQueryParams{
CustomValFilters: []string{"replay"},
JobsLimit: handle.dbReadSize,
}
toRetry, err := handle.db.GetToRetry(context.TODO(), queryParams)
jobsResult, err := handle.db.GetJobs(context.TODO(), []string{jobsdb.Unprocessed.State, jobsdb.Failed.State}, queryParams)
if err != nil {
handle.log.Errorf("Error getting to retry jobs: %v", err)
panic(err)
}
combinedList := toRetry.Jobs

if !toRetry.LimitsReached {
queryParams.JobsLimit -= len(combinedList)
unprocessed, err := handle.db.GetUnprocessed(context.TODO(), queryParams)
if err != nil {
handle.log.Errorf("Error getting unprocessed jobs: %v", err)
panic(err)
}
combinedList = append(combinedList, unprocessed.Jobs...)
}
combinedList := jobsResult.Jobs

handle.log.Infof("length of combinedList : %d", len(combinedList))

if len(combinedList) == 0 {
if breakLoop {
executingList, err := handle.db.GetExecuting(
executingList, err := handle.db.GetJobs(
context.TODO(),
jobsdb.GetQueryParamsT{
[]string{jobsdb.Executing.State},
jobsdb.GetQueryParams{
CustomValFilters: []string{"replay"},
JobsLimit: handle.dbReadSize,
},
Expand Down Expand Up @@ -155,7 +147,7 @@ func (handle *Handler) initSourceWorkers(ctx context.Context) {
handle.initSourceWorkersChannel <- true
}

func (handle *Handler) Setup(ctx context.Context, dumpsLoader *dumpsLoaderHandleT, db, toDB *jobsdb.HandleT, tablePrefix string, uploader filemanager.FileManager, bucket string, log logger.Logger) {
func (handle *Handler) Setup(ctx context.Context, dumpsLoader *dumpsLoaderHandleT, db, toDB *jobsdb.Handle, tablePrefix string, uploader filemanager.FileManager, bucket string, log logger.Logger) {
handle.log = log
handle.db = db
handle.toDB = toDB
Expand Down
6 changes: 3 additions & 3 deletions enterprise/replay/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func initFileManager(log logger.Logger) (filemanager.FileManager, string, error)
return uploader, bucket, nil
}

func setup(ctx context.Context, replayDB, gwDB, routerDB, batchRouterDB *jobsdb.HandleT, log logger.Logger) error {
func setup(ctx context.Context, replayDB, gwDB, routerDB, batchRouterDB *jobsdb.Handle, log logger.Logger) error {
tablePrefix := config.GetString("TO_REPLAY", "gw")
replayToDB := config.GetString("REPLAY_TO_DB", "gw")
log.Infof("TO_REPLAY=%s and REPLAY_TO_DB=%s", tablePrefix, replayToDB)
Expand All @@ -53,7 +53,7 @@ func setup(ctx context.Context, replayDB, gwDB, routerDB, batchRouterDB *jobsdb.
dumpsLoader.Setup(ctx, replayDB, tablePrefix, uploader, bucket, log)

var replayer Handler
var toDB *jobsdb.HandleT
var toDB *jobsdb.Handle
switch replayToDB {
case "gw":
toDB = gwDB
Expand All @@ -75,7 +75,7 @@ type Factory struct {
}

// Setup initializes Replay feature
func (m *Factory) Setup(ctx context.Context, replayDB, gwDB, routerDB, batchRouterDB *jobsdb.HandleT) {
func (m *Factory) Setup(ctx context.Context, replayDB, gwDB, routerDB, batchRouterDB *jobsdb.Handle) {
if m.Log == nil {
m.Log = logger.NewLogger().Child("enterprise").Child("replay")
}
Expand Down
2 changes: 1 addition & 1 deletion event-schema/event_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ func (manager *EventSchemaManagerT) reloadSchemaVersion(offloadedVersion *Offloa

// TODO: Move this into some DB manager
func createDBConnection() *sql.DB {
psqlInfo := misc.GetConnectionString()
psqlInfo := misc.GetConnectionString(config.Default)
var err error
dbHandle, err := sql.Open("postgres", psqlInfo)
if err != nil {
Expand Down
Loading

0 comments on commit 000dec6

Please sign in to comment.