-
Notifications
You must be signed in to change notification settings - Fork 111
/
connectors.go
181 lines (153 loc) · 5.02 KB
/
connectors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package connectors
import (
"context"
"fmt"
"reflect"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
)
var ErrIngestionLimitExceeded = fmt.Errorf("connectors: source ingestion exceeds limit")
type PermissionDeniedError struct {
msg string
}
func NewPermissionDeniedError(msg string) error {
return &PermissionDeniedError{msg: msg}
}
func (e *PermissionDeniedError) Error() string {
return e.msg
}
// Connectors tracks all registered connector drivers.
var Connectors = make(map[string]Connector)
// Register tracks a connector driver.
func Register(name string, connector Connector) {
if Connectors[name] != nil {
panic(fmt.Errorf("already registered connector with name '%s'", name))
}
Connectors[name] = connector
}
// Connector is a driver for ingesting data from an external system.
type Connector interface {
Spec() Spec
// TODO: Add method that extracts a source and outputs a schema and buffered
// iterator for data in it. For consumption by a drivers.OLAPStore. Also consider
// how to communicate splits and long-running/streaming data (e.g. for Kafka).
// Consume(ctx context.Context, source Source) error
ConsumeAsIterator(ctx context.Context, env *Env, source *Source) (FileIterator, error)
// HasAnonymousAccess returns true if external system can be accessed without credentials
HasAnonymousAccess(ctx context.Context, env *Env, source *Source) (bool, error)
}
// Spec provides metadata about a connector and the properties it supports.
type Spec struct {
DisplayName string
Description string
ServiceAccountDocs string
Properties []PropertySchema
ConnectorVariables []VariableSchema
Help string
}
// PropertySchema provides the schema for a property supported by a connector.
type PropertySchema struct {
Key string
Type PropertySchemaType
Required bool
DisplayName string
Description string
Placeholder string
Hint string
Href string
}
type VariableSchema struct {
Key string
Default string
Help string
Secret bool
ValidateFunc func(any interface{}) error
TransformFunc func(any interface{}) interface{}
}
// PropertySchemaType is an enum of types supported for connector properties.
type PropertySchemaType int
const (
UnspecifiedPropertyType PropertySchemaType = iota
StringPropertyType
NumberPropertyType
BooleanPropertyType
InformationalPropertyType
)
// ValidateType checks that val has the correct type.
func (ps PropertySchema) ValidateType(val any) bool {
switch val.(type) {
case string:
return ps.Type == StringPropertyType
case bool:
return ps.Type == BooleanPropertyType
case int, int8, int16, int32, int64, float32, float64:
return ps.Type == NumberPropertyType
default:
return false
}
}
// Env contains contextual information for a source, such as the repo it came from
// and (in the future) secrets configured by the user.
type Env struct {
RepoDriver string
RepoRoot string
// user provided env variables kept with keys converted to uppercase
Variables map[string]string
AllowHostAccess bool
StorageLimitInBytes int64
}
// Source represents a dataset to ingest using a specific connector (like a connector instance).
type Source struct {
Name string
Connector string
ExtractPolicy *runtimev1.Source_ExtractPolicy
Properties map[string]any
Timeout int32
}
// SamplePolicy tells the connector to only ingest a sample of data from the source.
// Support for it is currently not implemented.
type SamplePolicy struct {
Strategy string
Sample float32
Limit int
}
// FileIterator provides ways to iteratively ingest files downloaded from external sources
// Clients should call close once they are done with iterator to release any resources
type FileIterator interface {
// Close do cleanup and release resources
Close() error
// NextBatch returns a list of file downloaded from external sources
// NextBatch cleanups file created in previous batch
NextBatch(limit int) ([]string, error)
// HasNext can be utlisied to check if iterator has more elements left
HasNext() bool
}
// Validate checks the source's properties against its connector's spec.
func (s *Source) Validate() error {
connector, ok := Connectors[s.Connector]
if !ok {
return fmt.Errorf("connector: not found %q", s.Connector)
}
for _, propSchema := range connector.Spec().Properties {
val, ok := s.Properties[propSchema.Key]
if !ok {
if propSchema.Required {
return fmt.Errorf("missing required property '%s'", propSchema.Key)
}
continue
}
if !propSchema.ValidateType(val) {
return fmt.Errorf("unexpected type '%T' for property '%s'", val, propSchema.Key)
}
}
return nil
}
func ConsumeAsIterator(ctx context.Context, env *Env, source *Source) (FileIterator, error) {
connector, ok := Connectors[source.Connector]
if !ok {
return nil, fmt.Errorf("connector: not found")
}
return connector.ConsumeAsIterator(ctx, env, source)
}
func (s *Source) PropertiesEquals(o *Source) bool {
return reflect.DeepEqual(s.Properties, o.Properties)
}