-
Notifications
You must be signed in to change notification settings - Fork 313
/
manager.go
119 lines (111 loc) · 3.93 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package manager
import (
"fmt"
"time"
azuresynapse "github.com/rudderlabs/rudder-server/warehouse/azure-synapse"
"github.com/rudderlabs/rudder-server/warehouse/bigquery"
"github.com/rudderlabs/rudder-server/warehouse/clickhouse"
"github.com/rudderlabs/rudder-server/warehouse/client"
"github.com/rudderlabs/rudder-server/warehouse/datalake"
"github.com/rudderlabs/rudder-server/warehouse/deltalake"
"github.com/rudderlabs/rudder-server/warehouse/mssql"
"github.com/rudderlabs/rudder-server/warehouse/postgres"
"github.com/rudderlabs/rudder-server/warehouse/redshift"
"github.com/rudderlabs/rudder-server/warehouse/snowflake"
"github.com/rudderlabs/rudder-server/utils/misc"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)
type ManagerI interface {
Setup(warehouse warehouseutils.WarehouseT, uploader warehouseutils.UploaderI) error
CrashRecover(warehouse warehouseutils.WarehouseT) (err error)
FetchSchema(warehouse warehouseutils.WarehouseT) (warehouseutils.SchemaT, error)
CreateSchema() (err error)
CreateTable(tableName string, columnMap map[string]string) (err error)
AddColumn(tableName, columnName, columnType string) (err error)
AlterColumn(tableName, columnName, columnType string) (err error)
LoadTable(tableName string) error
LoadUserTables() map[string]error
LoadIdentityMergeRulesTable() error
LoadIdentityMappingsTable() error
Cleanup()
IsEmpty(warehouse warehouseutils.WarehouseT) (bool, error)
TestConnection(warehouse warehouseutils.WarehouseT) error
DownloadIdentityRules(*misc.GZipWriter) error
GetTotalCountInTable(tableName string) (int64, error)
Connect(warehouse warehouseutils.WarehouseT) (client.Client, error)
LoadTestTable(location, stagingTableName string, payloadMap map[string]interface{}, loadFileFormat string) error
SetConnectionTimeout(timeout time.Duration)
}
type WarehouseDelete interface {
DropTable(tableName string) (err error)
}
type WarehouseOperations interface {
ManagerI
WarehouseDelete
}
// New is a Factory function that returns a ManagerI of a given destination-type
func New(destType string) (ManagerI, error) {
switch destType {
case warehouseutils.RS:
var rs redshift.HandleT
return &rs, nil
case warehouseutils.BQ:
var bq bigquery.HandleT
return &bq, nil
case warehouseutils.SNOWFLAKE:
var sf snowflake.HandleT
return &sf, nil
case warehouseutils.POSTGRES:
var pg postgres.HandleT
return &pg, nil
case warehouseutils.CLICKHOUSE:
var ch clickhouse.HandleT
return &ch, nil
case warehouseutils.MSSQL:
var ms mssql.HandleT
return &ms, nil
case warehouseutils.AZURE_SYNAPSE:
var as azuresynapse.HandleT
return &as, nil
case warehouseutils.S3_DATALAKE, warehouseutils.GCS_DATALAKE, warehouseutils.AZURE_DATALAKE:
var dl datalake.HandleT
return &dl, nil
case warehouseutils.DELTALAKE:
var dl deltalake.HandleT
return &dl, nil
}
return nil, fmt.Errorf("Provider of type %s is not configured for WarehouseManager", destType)
}
// NewWarehouseOperations is a Factory function that returns a WarehouseOperations of a given destination-type
func NewWarehouseOperations(destType string) (WarehouseOperations, error) {
switch destType {
case warehouseutils.RS:
var rs redshift.HandleT
return &rs, nil
case warehouseutils.BQ:
var bq bigquery.HandleT
return &bq, nil
case warehouseutils.SNOWFLAKE:
var sf snowflake.HandleT
return &sf, nil
case warehouseutils.POSTGRES:
var pg postgres.HandleT
return &pg, nil
case warehouseutils.CLICKHOUSE:
var ch clickhouse.HandleT
return &ch, nil
case warehouseutils.MSSQL:
var ms mssql.HandleT
return &ms, nil
case warehouseutils.AZURE_SYNAPSE:
var as azuresynapse.HandleT
return &as, nil
case warehouseutils.S3_DATALAKE, warehouseutils.GCS_DATALAKE, warehouseutils.AZURE_DATALAKE:
var dl datalake.HandleT
return &dl, nil
case warehouseutils.DELTALAKE:
var dl deltalake.HandleT
return &dl, nil
}
return nil, fmt.Errorf("Provider of type %s is not configured for WarehouseManager", destType)
}