-
Notifications
You must be signed in to change notification settings - Fork 307
/
types.go
145 lines (115 loc) · 4.09 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package validations
import (
"encoding/json"
"fmt"
"time"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/manager"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)
type validationFunc struct {
Path string
Func func(json.RawMessage, string) (json.RawMessage, error)
}
type DestinationValidationRequest struct {
Destination backendconfig.DestinationT `json:"destination"`
}
type validationStep struct {
ID int `json:"id"`
Name string `json:"name"`
Success bool `json:"success"`
Error string `json:"error"`
Validator validator `json:"-"`
}
type validator func() error
type validationStepsResponse struct {
Steps []*validationStep `json:"steps"`
}
type DestinationValidator interface {
ValidateCredentials(req *DestinationValidationRequest) (*DestinationValidationResponse, error)
}
// NewDestinationValidator encapsulates the process
// to generate the destination validator.
func NewDestinationValidator() DestinationValidator {
handler := &CTHandleT{}
return &DestinationValidatorImpl{
validateFunc: handler.validateDestinationFunc,
}
}
type DestinationValidatorImpl struct {
// validateFunc takes the `creds` as raw-message for backward
// compatibility and then results the response in raw-message
// which can decoded later into proper struct.
validateFunc func(json.RawMessage, string) (json.RawMessage, error)
}
// ValidateCredentials for now offloads the request to destination validation
// to the validationFunc. This function runs through all the steps in the validation check
// and then generate a valid response.
func (dv *DestinationValidatorImpl) ValidateCredentials(req *DestinationValidationRequest) (*DestinationValidationResponse, error) {
byt, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to get marshal validation request: %v", err)
}
bytResponse, err := dv.validateFunc(byt, "")
if err != nil {
return nil, fmt.Errorf("unable to perform validation on destination: %s credentials, error: %v",
req.Destination.ID,
err)
}
validationResponse := &DestinationValidationResponse{}
err = json.Unmarshal(bytResponse, validationResponse)
return validationResponse, err
}
type DestinationValidationResponse struct {
Success bool `json:"success"`
Error string `json:"error"`
Steps []*validationStep `json:"steps"`
}
type CTHandleT struct {
infoRequest *DestinationValidationRequest
warehouse warehouseutils.Warehouse
manager manager.WarehouseOperations
}
type CTUploadJob struct {
infoRequest *DestinationValidationRequest
}
func (*CTUploadJob) GetSchemaInWarehouse() warehouseutils.SchemaT {
return warehouseutils.SchemaT{}
}
func (*CTUploadJob) GetLocalSchema() warehouseutils.SchemaT {
return warehouseutils.SchemaT{}
}
func (*CTUploadJob) UpdateLocalSchema(_ warehouseutils.SchemaT) error {
return nil
}
func (*CTUploadJob) GetTableSchemaInWarehouse(_ string) warehouseutils.TableSchemaT {
return warehouseutils.TableSchemaT{}
}
func (*CTUploadJob) GetTableSchemaInUpload(_ string) warehouseutils.TableSchemaT {
return warehouseutils.TableSchemaT{}
}
func (*CTUploadJob) GetLoadFilesMetadata(_ warehouseutils.GetLoadFilesOptionsT) []warehouseutils.LoadFileT {
return []warehouseutils.LoadFileT{}
}
func (*CTUploadJob) GetSampleLoadFileLocation(_ string) (string, error) {
return "", nil
}
func (*CTUploadJob) GetSingleLoadFile(_ string) (warehouseutils.LoadFileT, error) {
return warehouseutils.LoadFileT{}, nil
}
func (*CTUploadJob) ShouldOnDedupUseNewRecord() bool {
return false
}
func (job *CTUploadJob) UseRudderStorage() bool {
return misc.IsConfiguredToUseRudderObjectStorage(job.infoRequest.Destination.Config)
}
func (*CTUploadJob) GetLoadFileGenStartTIme() time.Time {
return time.Time{}
}
func (job *CTUploadJob) GetLoadFileType() string {
return warehouseutils.GetLoadFileType(job.infoRequest.Destination.DestinationDefinition.Name)
}
func (*CTUploadJob) GetFirstLastEvent() (time.Time, time.Time) {
return time.Time{}, time.Time{}
}