-
Notifications
You must be signed in to change notification settings - Fork 317
/
admin.go
121 lines (99 loc) · 3.17 KB
/
admin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package warehouse
import (
"context"
"errors"
"fmt"
"strings"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
"github.com/rudderlabs/rudder-server/warehouse/validations"
"github.com/rudderlabs/rudder-server/admin"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)
type WarehouseAdmin struct{}
type QueryInput struct {
DestID string
SourceID string
SQLStatement string
}
type ConfigurationTestInput struct {
DestID string
}
type ConfigurationTestOutput struct {
Valid bool
Error string
}
func Init5() {
admin.RegisterAdminHandler("Warehouse", &WarehouseAdmin{})
}
// TriggerUpload sets uploads to start without delay
func (*WarehouseAdmin) TriggerUpload(off bool, reply *string) error {
startUploadAlways = !off
if off {
*reply = "Turned off explicit warehouse upload triggers.\nWarehouse uploads will continue to be done as per schedule in control plane."
} else {
*reply = "Successfully set uploads to start always without delay.\nRun same command with -o flag to turn off explicit triggers."
}
return nil
}
// Query the underlying warehouse
func (*WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) error {
if strings.TrimSpace(s.DestID) == "" {
return errors.New("please specify the destination ID to query the warehouse")
}
var warehouse model.Warehouse
srcMap, ok := connectionsMap[s.DestID]
if !ok {
return errors.New("please specify a valid and existing destination ID")
}
// use the sourceID-destID connection if sourceID is not empty
if s.SourceID != "" {
w, ok := srcMap[s.SourceID]
if !ok {
return errors.New("please specify a valid (sourceID, destination ID) pair")
}
warehouse = w
} else {
// use any source connected to the given destination otherwise
for _, v := range srcMap {
warehouse = v
break
}
}
whManager, err := manager.New(warehouse.Type)
if err != nil {
return err
}
whManager.SetConnectionTimeout(warehouseutils.GetConnectionTimeout(
warehouse.Type, warehouse.Destination.ID,
))
client, err := whManager.Connect(context.TODO(), warehouse)
if err != nil {
return err
}
defer client.Close()
pkgLogger.Infof(`[WH Admin]: Querying warehouse: %s:%s`, warehouse.Type, warehouse.Destination.ID)
*reply, err = client.Query(s.SQLStatement)
return err
}
// ConfigurationTest test the underlying warehouse destination
func (*WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error {
if strings.TrimSpace(s.DestID) == "" {
return errors.New("please specify the destination ID to query the warehouse")
}
var warehouse model.Warehouse
srcMap, ok := connectionsMap[s.DestID]
if !ok {
return fmt.Errorf("please specify a valid and existing destinationID: %s", s.DestID)
}
for _, v := range srcMap {
warehouse = v
break
}
pkgLogger.Infof(`[WH Admin]: Validating warehouse destination: %s:%s`, warehouse.Type, warehouse.Destination.ID)
destinationValidator := validations.NewDestinationValidator()
res := destinationValidator.Validate(context.TODO(), &warehouse.Destination)
reply.Valid = res.Success
reply.Error = res.Error
return nil
}