Skip to content

Commit

Permalink
feat(warehouse): temp table support for postgres (#2964)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Mar 9, 2023
1 parent 1d9f63b commit 9a80f45
Show file tree
Hide file tree
Showing 31 changed files with 3,555 additions and 1,248 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ require (
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/cli v20.10.14+incompatible // indirect
github.com/docker/docker v20.10.21+incompatible // indirect
github.com/docker/docker v20.10.21+incompatible
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
Expand Down
24 changes: 14 additions & 10 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/warehouse/integrations/azure-synapse"
"github.com/rudderlabs/rudder-server/warehouse/integrations/bigquery"
"github.com/rudderlabs/rudder-server/warehouse/integrations/clickhouse"
"github.com/rudderlabs/rudder-server/warehouse/integrations/datalake"
"github.com/rudderlabs/rudder-server/warehouse/integrations/deltalake"
"github.com/rudderlabs/rudder-server/warehouse/integrations/mssql"
"github.com/rudderlabs/rudder-server/warehouse/integrations/postgres"
postgreslegacy "github.com/rudderlabs/rudder-server/warehouse/integrations/postgres-legacy"
"github.com/rudderlabs/rudder-server/warehouse/integrations/redshift"
"github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake"

warehousearchiver "github.com/rudderlabs/rudder-server/warehouse/archive"

"github.com/bugsnag/bugsnag-go/v2"
_ "go.uber.org/automaxprocs"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -53,16 +66,6 @@ import (
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
"github.com/rudderlabs/rudder-server/warehouse"
warehousearchiver "github.com/rudderlabs/rudder-server/warehouse/archive"
azuresynapse "github.com/rudderlabs/rudder-server/warehouse/integrations/azure-synapse"
"github.com/rudderlabs/rudder-server/warehouse/integrations/bigquery"
"github.com/rudderlabs/rudder-server/warehouse/integrations/clickhouse"
"github.com/rudderlabs/rudder-server/warehouse/integrations/datalake"
"github.com/rudderlabs/rudder-server/warehouse/integrations/deltalake"
"github.com/rudderlabs/rudder-server/warehouse/integrations/mssql"
"github.com/rudderlabs/rudder-server/warehouse/integrations/postgres"
"github.com/rudderlabs/rudder-server/warehouse/integrations/redshift"
"github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/rudderlabs/rudder-server/warehouse/validations"
)
Expand Down Expand Up @@ -352,6 +355,7 @@ func runAllInit() {
azuresynapse.Init()
mssql.Init()
postgres.Init()
postgreslegacy.Init()
redshift.Init()
snowflake.Init()
deltalake.Init()
Expand Down
36 changes: 0 additions & 36 deletions utils/misc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,42 +829,6 @@ func RunWithTimeout(f, onTimeout func(), d time.Duration) {
}
}

/*
RWCConfig config for RunWithConcurrency
factor: number of concurrent job
jobs: range of jobs you need to provide
runJob: caller function for the concurrent job
*/
type RWCJob interface{}

type RWCConfig struct {
Factor int
Jobs *[]RWCJob
Run func(RWCJob interface{})
}

/*
RunWithConcurrency runs provided function f with concurrency provided by the factor factor.
*/
func RunWithConcurrency(config *RWCConfig) {
var wg sync.WaitGroup

concurrencyChan := make(chan struct{}, config.Factor)
for _, job := range *config.Jobs {
wg.Add(1)
concurrencyChan <- struct{}{}
runJob := job
go func() {
defer func() {
<-concurrencyChan
wg.Done()
}()
config.Run(runJob)
}()
}
wg.Wait()
}

/*
IsValidUUID will check if provided string is a valid UUID
*/
Expand Down
72 changes: 10 additions & 62 deletions warehouse/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader"

"github.com/lib/pq"
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/filemanager"
Expand All @@ -30,11 +32,12 @@ type WarehouseManager interface {
}

type HandleT struct {
Warehouse warehouseutils.Warehouse
DbHandle *sql.DB
Uploader warehouseutils.UploaderI
UploadID int64
WarehouseManager WarehouseManager
Warehouse warehouseutils.Warehouse
DB *sql.DB
Uploader warehouseutils.UploaderI
UploadID int64
WarehouseManager WarehouseManager
LoadFileDownloader downloader.Downloader
}

func (idr *HandleT) mergeRulesTable() string {
Expand Down Expand Up @@ -350,61 +353,6 @@ func (idr *HandleT) writeTableToFile(tableName string, txn *sql.Tx, gzWriter *mi
return
}

func (idr *HandleT) downloadLoadFiles(tableName string) ([]string, error) {
objects := idr.Uploader.GetLoadFilesMetadata(warehouseutils.GetLoadFilesOptionsT{Table: tableName})
var fileNames []string
for _, object := range objects {
objectName, err := warehouseutils.GetObjectName(object.Location, idr.Warehouse.Destination.Config, warehouseutils.ObjectStorageType(idr.Warehouse.Destination.DestinationDefinition.Name, idr.Warehouse.Destination.Config, idr.Uploader.UseRudderStorage()))
if err != nil {
pkgLogger.Errorf("IDR: Error in converting object location to object key for table:%s: %s,%v", tableName, object.Location, err)
return nil, err
}
dirName := fmt.Sprintf(`/%s/`, misc.RudderWarehouseLoadUploadsTmp)
tmpDirPath, err := misc.CreateTMPDIR()
if err != nil {
pkgLogger.Errorf("IDR: Error in creating tmp directory for downloading load file for table:%s: %s, %v", tableName, object.Location, err)
return nil, err
}
objectPath := tmpDirPath + dirName + fmt.Sprintf(`%s_%s_%d/`, idr.Warehouse.Destination.DestinationDefinition.Name, idr.Warehouse.Destination.ID, time.Now().Unix()) + objectName
err = os.MkdirAll(filepath.Dir(objectPath), os.ModePerm)
if err != nil {
pkgLogger.Errorf("IDR: Error in making tmp directory for downloading load file for table:%s: %s, %s %v", tableName, object.Location, err)
return nil, err
}
objectFile, err := os.Create(objectPath)
if err != nil {
pkgLogger.Errorf("IDR: Error in creating file in tmp directory for downloading load file for table:%s: %s, %v", tableName, object.Location, err)
return nil, err
}
storageProvider := warehouseutils.ObjectStorageType(idr.Warehouse.Destination.DestinationDefinition.Name, idr.Warehouse.Destination.Config, idr.Uploader.UseRudderStorage())
downloader, err := filemanager.DefaultFileManagerFactory.New(&filemanager.SettingsT{
Provider: storageProvider,
Config: misc.GetObjectStorageConfig(misc.ObjectStorageOptsT{
Provider: storageProvider,
Config: idr.Warehouse.Destination.Config,
UseRudderStorage: idr.Uploader.UseRudderStorage(),
WorkspaceID: idr.Warehouse.Destination.WorkspaceID,
}),
})
if err != nil {
pkgLogger.Errorf("IDR: Error in creating a file manager for :%s: , %v", idr.Warehouse.Destination.DestinationDefinition.Name, err)
return nil, err
}
err = downloader.Download(context.TODO(), objectFile, objectName)
if err != nil {
pkgLogger.Errorf("IDR: Error in downloading file in tmp directory for downloading load file for table:%s: %s, %v", tableName, object.Location, err)
return nil, err
}
fileName := objectFile.Name()
if err = objectFile.Close(); err != nil {
pkgLogger.Errorf("IDR: Error in closing downloaded file in tmp directory for downloading load file for table:%s: %s, %v", tableName, object.Location, err)
return nil, err
}
fileNames = append(fileNames, fileName)
}
return fileNames, nil
}

func (idr *HandleT) uploadFile(filePath string, txn *sql.Tx, tableName string, totalRecords int) (err error) {
outputFile, err := os.Open(filePath)
if err != nil {
Expand Down Expand Up @@ -456,7 +404,7 @@ func (idr *HandleT) createTempGzFile(dirName string) (gzWriter misc.GZipWriter,
}

func (idr *HandleT) processMergeRules(fileNames []string) (err error) {
txn, err := idr.DbHandle.Begin()
txn, err := idr.DB.Begin()
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -523,7 +471,7 @@ func (idr *HandleT) processMergeRules(fileNames []string) (err error) {
func (idr *HandleT) Resolve() (err error) {
var loadFileNames []string
defer misc.RemoveFilePaths(loadFileNames...)
loadFileNames, err = idr.downloadLoadFiles(idr.whMergeRulesTable())
loadFileNames, err = idr.LoadFileDownloader.Download(context.TODO(), idr.whMergeRulesTable())
if err != nil {
pkgLogger.Errorf(`IDR: Failed to download load files for %s with error: %v`, idr.mergeRulesTable(), err)
return
Expand Down
Loading

0 comments on commit 9a80f45

Please sign in to comment.