-
Notifications
You must be signed in to change notification settings - Fork 53
/
destinations.go
133 lines (110 loc) · 3.16 KB
/
destinations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package destinations
import (
"context"
"errors"
"io"
"github.com/scratchdata/scratchdata/models"
"github.com/scratchdata/scratchdata/pkg/config"
"github.com/scratchdata/scratchdata/pkg/storage"
"github.com/EagleChen/mapmutex"
"github.com/rs/zerolog/log"
"github.com/scratchdata/scratchdata/pkg/destinations/bigquery"
"github.com/scratchdata/scratchdata/pkg/destinations/clickhouse"
"github.com/scratchdata/scratchdata/pkg/destinations/duckdb"
"github.com/scratchdata/scratchdata/pkg/destinations/postgres"
"github.com/scratchdata/scratchdata/pkg/destinations/redshift"
)
type DestinationManager struct {
storage *storage.Services
pool map[int64]Destination
mux *mapmutex.Mutex
}
type Destination interface {
QueryNDJson(query string, writer io.Writer) error
QueryJSON(query string, writer io.Writer) error
QueryCSV(query string, writer io.Writer) error
Tables() ([]string, error)
Columns(table string) ([]models.Column, error)
CreateEmptyTable(name string) error
CreateColumns(table string, filePath string) error
InsertFromNDJsonFile(table string, filePath string) error
Close() error
}
func NewDestinationManager(storage *storage.Services) *DestinationManager {
mux := mapmutex.NewMapMutex()
rc := DestinationManager{
storage: storage,
pool: map[int64]Destination{},
mux: mux,
}
return &rc
}
func (m *DestinationManager) CloseAll() {
for id, dest := range m.pool {
// TODO: context timeout on close
err := dest.Close()
if err != nil {
log.Error().Err(err).Int64("destination_id", id).Msg("Unable to close destination")
}
}
}
func (m *DestinationManager) TestCredentials(creds config.Destination) error {
var dest Destination
var err error
switch creds.Type {
case "duckdb":
dest, err = duckdb.OpenServer(creds.Settings)
case "clickhouse":
dest, err = clickhouse.OpenServer(creds.Settings)
case "redshift":
dest, err = redshift.OpenServer(creds.Settings)
case "bigquery":
dest, err = bigquery.OpenServer(creds.Settings)
case "postgres":
dest, err = postgres.OpenServer(creds.Settings)
default:
err = errors.New("Invalid destination type")
}
if err != nil {
return err
}
dest.Close()
return nil
}
func (m *DestinationManager) Destination(ctx context.Context, databaseID int64) (Destination, error) {
if m.mux.TryLock(databaseID) {
defer m.mux.Unlock(databaseID)
var dest Destination
dest, ok := m.pool[databaseID]
if ok {
return dest, nil
}
creds, err := m.storage.Database.GetDestinationCredentials(ctx, databaseID)
if err != nil {
return nil, err
}
settings := creds.Settings.Data()
switch creds.Type {
case "duckdb":
dest, err = duckdb.OpenServer(settings)
case "clickhouse":
dest, err = clickhouse.OpenServer(settings)
case "redshift":
dest, err = redshift.OpenServer(settings)
case "bigquery":
dest, err = bigquery.OpenServer(settings)
case "postgres":
dest, err = postgres.OpenServer(settings)
}
if err != nil {
return nil, err
}
if dest != nil {
m.pool[databaseID] = dest
return dest, nil
} else {
return nil, errors.New("Unrecognized destination type " + creds.Type)
}
}
return nil, errors.New("unable to acquire destination lock")
}