Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(warehouse): temp table support for postgres #2964

Merged
merged 46 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
bf446e7
added support for load file downloader
achettyiitr Feb 9, 2023
b2c8909
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Feb 9, 2023
ace9f82
formatted mssql and azure synapse
achettyiitr Feb 9, 2023
78141c0
added load table and load users table components
achettyiitr Feb 10, 2023
8e0499d
some more changes
achettyiitr Feb 10, 2023
223c416
some more changes
achettyiitr Feb 10, 2023
1cbbcb2
added test cases for user and identifies table
achettyiitr Feb 10, 2023
2170044
code cleanup
achettyiitr Feb 10, 2023
fd6c055
added some more tests.
achettyiitr Feb 10, 2023
6e7e693
master pull
achettyiitr Feb 15, 2023
e95967f
some more test cases
achettyiitr Feb 15, 2023
9f0e8ca
bq create table
achettyiitr Feb 15, 2023
88841a1
handle in case of user table loading
achettyiitr Feb 15, 2023
538ac83
some more changes.
achettyiitr Feb 25, 2023
ccca5ca
added diagnostic
achettyiitr Feb 25, 2023
c7b6d09
make fmt
achettyiitr Feb 25, 2023
8eac2f2
master pull
achettyiitr Feb 25, 2023
914647a
make fmt
achettyiitr Feb 25, 2023
af1398e
deepsource changes
achettyiitr Feb 25, 2023
317f5a5
some more changes
achettyiitr Feb 26, 2023
c809661
make fmt
achettyiitr Feb 26, 2023
5c129c4
loadfile downloader changes.
achettyiitr Feb 26, 2023
1600259
making tests paralle.
achettyiitr Feb 26, 2023
4ca477a
added todos
achettyiitr Feb 26, 2023
2fe3e3b
make fmt
achettyiitr Feb 26, 2023
3d4f86f
fix recovery test
achettyiitr Feb 26, 2023
5b00445
cleanup
achettyiitr Feb 26, 2023
9418768
fix flaky test
achettyiitr Feb 26, 2023
3b28a6d
go mod tidy
achettyiitr Feb 26, 2023
dcb2279
added discards table tests
achettyiitr Feb 26, 2023
f89238b
added observability for dedup query
achettyiitr Feb 26, 2023
bb38850
review comments
achettyiitr Feb 27, 2023
cc63f53
master pull
achettyiitr Feb 28, 2023
483d3de
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Mar 1, 2023
193bc42
move load files downloader insider the loadfiles package
achettyiitr Mar 1, 2023
fa45b5d
use legacy code back
achettyiitr Mar 1, 2023
40d69cb
imports issue fix
achettyiitr Mar 1, 2023
894db2c
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Mar 1, 2023
9b9c7a0
remove duplicated tests for postgres
achettyiitr Mar 1, 2023
aea5bca
master pull
achettyiitr Mar 2, 2023
2e016c6
added crash reccoveries changes back.
achettyiitr Mar 2, 2023
dfeb215
fix tests
achettyiitr Mar 2, 2023
e962846
master pull
achettyiitr Mar 7, 2023
ef25ae7
fix test failures
achettyiitr Mar 7, 2023
998a175
Merge branch 'master' into chore.postgres-temp-tables
achettyiitr Mar 7, 2023
7f5611b
Merge branch 'master' into chore.postgres-temp-tables
achettyiitr Mar 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ require (
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/cli v20.10.14+incompatible // indirect
github.com/docker/docker v20.10.21+incompatible // indirect
github.com/docker/docker v20.10.21+incompatible
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
Expand Down
24 changes: 14 additions & 10 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/warehouse/integrations/azure-synapse"
"github.com/rudderlabs/rudder-server/warehouse/integrations/bigquery"
"github.com/rudderlabs/rudder-server/warehouse/integrations/clickhouse"
"github.com/rudderlabs/rudder-server/warehouse/integrations/datalake"
"github.com/rudderlabs/rudder-server/warehouse/integrations/deltalake"
"github.com/rudderlabs/rudder-server/warehouse/integrations/mssql"
"github.com/rudderlabs/rudder-server/warehouse/integrations/postgres"
postgreslegacy "github.com/rudderlabs/rudder-server/warehouse/integrations/postgres-legacy"
"github.com/rudderlabs/rudder-server/warehouse/integrations/redshift"
"github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake"

warehousearchiver "github.com/rudderlabs/rudder-server/warehouse/archive"

"github.com/bugsnag/bugsnag-go/v2"
_ "go.uber.org/automaxprocs"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -53,16 +66,6 @@ import (
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
"github.com/rudderlabs/rudder-server/warehouse"
warehousearchiver "github.com/rudderlabs/rudder-server/warehouse/archive"
azuresynapse "github.com/rudderlabs/rudder-server/warehouse/integrations/azure-synapse"
"github.com/rudderlabs/rudder-server/warehouse/integrations/bigquery"
"github.com/rudderlabs/rudder-server/warehouse/integrations/clickhouse"
"github.com/rudderlabs/rudder-server/warehouse/integrations/datalake"
"github.com/rudderlabs/rudder-server/warehouse/integrations/deltalake"
"github.com/rudderlabs/rudder-server/warehouse/integrations/mssql"
"github.com/rudderlabs/rudder-server/warehouse/integrations/postgres"
"github.com/rudderlabs/rudder-server/warehouse/integrations/redshift"
"github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/rudderlabs/rudder-server/warehouse/validations"
)
Expand Down Expand Up @@ -352,6 +355,7 @@ func runAllInit() {
azuresynapse.Init()
mssql.Init()
postgres.Init()
postgreslegacy.Init()
redshift.Init()
snowflake.Init()
deltalake.Init()
Expand Down
36 changes: 0 additions & 36 deletions utils/misc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,42 +829,6 @@ func RunWithTimeout(f, onTimeout func(), d time.Duration) {
}
}

/*
RWCConfig config for RunWithConcurrency
factor: number of concurrent job
jobs: range of jobs you need to provide
runJob: caller function for the concurrent job
*/
type RWCJob interface{}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this as it is similar to errgroup.


type RWCConfig struct {
Factor int
Jobs *[]RWCJob
Run func(RWCJob interface{})
}

/*
RunWithConcurrency runs provided function f with concurrency provided by the factor factor.
*/
func RunWithConcurrency(config *RWCConfig) {
var wg sync.WaitGroup

concurrencyChan := make(chan struct{}, config.Factor)
for _, job := range *config.Jobs {
wg.Add(1)
concurrencyChan <- struct{}{}
runJob := job
go func() {
defer func() {
<-concurrencyChan
wg.Done()
}()
config.Run(runJob)
}()
}
wg.Wait()
}

/*
IsValidUUID will check if provided string is a valid UUID
*/
Expand Down
72 changes: 10 additions & 62 deletions warehouse/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader"

"github.com/lib/pq"
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/filemanager"
Expand All @@ -30,11 +32,12 @@ type WarehouseManager interface {
}

type HandleT struct {
Warehouse warehouseutils.Warehouse
DbHandle *sql.DB
Uploader warehouseutils.UploaderI
UploadID int64
WarehouseManager WarehouseManager
Warehouse warehouseutils.Warehouse
DB *sql.DB
Uploader warehouseutils.UploaderI
UploadID int64
WarehouseManager WarehouseManager
LoadFileDownloader downloader.Downloader
}

func (idr *HandleT) mergeRulesTable() string {
Expand Down Expand Up @@ -350,61 +353,6 @@ func (idr *HandleT) writeTableToFile(tableName string, txn *sql.Tx, gzWriter *mi
return
}

func (idr *HandleT) downloadLoadFiles(tableName string) ([]string, error) {
objects := idr.Uploader.GetLoadFilesMetadata(warehouseutils.GetLoadFilesOptionsT{Table: tableName})
var fileNames []string
for _, object := range objects {
objectName, err := warehouseutils.GetObjectName(object.Location, idr.Warehouse.Destination.Config, warehouseutils.ObjectStorageType(idr.Warehouse.Destination.DestinationDefinition.Name, idr.Warehouse.Destination.Config, idr.Uploader.UseRudderStorage()))
if err != nil {
pkgLogger.Errorf("IDR: Error in converting object location to object key for table:%s: %s,%v", tableName, object.Location, err)
return nil, err
}
dirName := fmt.Sprintf(`/%s/`, misc.RudderWarehouseLoadUploadsTmp)
tmpDirPath, err := misc.CreateTMPDIR()
if err != nil {
pkgLogger.Errorf("IDR: Error in creating tmp directory for downloading load file for table:%s: %s, %v", tableName, object.Location, err)
return nil, err
}
objectPath := tmpDirPath + dirName + fmt.Sprintf(`%s_%s_%d/`, idr.Warehouse.Destination.DestinationDefinition.Name, idr.Warehouse.Destination.ID, time.Now().Unix()) + objectName
err = os.MkdirAll(filepath.Dir(objectPath), os.ModePerm)
if err != nil {
pkgLogger.Errorf("IDR: Error in making tmp directory for downloading load file for table:%s: %s, %s %v", tableName, object.Location, err)
return nil, err
}
objectFile, err := os.Create(objectPath)
if err != nil {
pkgLogger.Errorf("IDR: Error in creating file in tmp directory for downloading load file for table:%s: %s, %v", tableName, object.Location, err)
return nil, err
}
storageProvider := warehouseutils.ObjectStorageType(idr.Warehouse.Destination.DestinationDefinition.Name, idr.Warehouse.Destination.Config, idr.Uploader.UseRudderStorage())
downloader, err := filemanager.DefaultFileManagerFactory.New(&filemanager.SettingsT{
Provider: storageProvider,
Config: misc.GetObjectStorageConfig(misc.ObjectStorageOptsT{
Provider: storageProvider,
Config: idr.Warehouse.Destination.Config,
UseRudderStorage: idr.Uploader.UseRudderStorage(),
WorkspaceID: idr.Warehouse.Destination.WorkspaceID,
}),
})
if err != nil {
pkgLogger.Errorf("IDR: Error in creating a file manager for :%s: , %v", idr.Warehouse.Destination.DestinationDefinition.Name, err)
return nil, err
}
err = downloader.Download(context.TODO(), objectFile, objectName)
if err != nil {
pkgLogger.Errorf("IDR: Error in downloading file in tmp directory for downloading load file for table:%s: %s, %v", tableName, object.Location, err)
return nil, err
}
fileName := objectFile.Name()
if err = objectFile.Close(); err != nil {
pkgLogger.Errorf("IDR: Error in closing downloaded file in tmp directory for downloading load file for table:%s: %s, %v", tableName, object.Location, err)
return nil, err
}
fileNames = append(fileNames, fileName)
}
return fileNames, nil
}

func (idr *HandleT) uploadFile(filePath string, txn *sql.Tx, tableName string, totalRecords int) (err error) {
outputFile, err := os.Open(filePath)
if err != nil {
Expand Down Expand Up @@ -456,7 +404,7 @@ func (idr *HandleT) createTempGzFile(dirName string) (gzWriter misc.GZipWriter,
}

func (idr *HandleT) processMergeRules(fileNames []string) (err error) {
txn, err := idr.DbHandle.Begin()
txn, err := idr.DB.Begin()
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -523,7 +471,7 @@ func (idr *HandleT) processMergeRules(fileNames []string) (err error) {
func (idr *HandleT) Resolve() (err error) {
var loadFileNames []string
defer misc.RemoveFilePaths(loadFileNames...)
loadFileNames, err = idr.downloadLoadFiles(idr.whMergeRulesTable())
loadFileNames, err = idr.LoadFileDownloader.Download(context.TODO(), idr.whMergeRulesTable())
if err != nil {
pkgLogger.Errorf(`IDR: Failed to download load files for %s with error: %v`, idr.mergeRulesTable(), err)
return
Expand Down
Loading