-
Notifications
You must be signed in to change notification settings - Fork 13
/
schema.go
64 lines (48 loc) · 1.49 KB
/
schema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package streamdal
import (
"context"
"github.com/streamdal/streamdal/libs/protos/build/go/protos"
)
func (s *Streamdal) getSchema(_ context.Context, aud *protos.Audience) []byte {
s.schemasMtx.RLock()
defer s.schemasMtx.RUnlock()
schema, ok := s.schemas[audToStr(aud)]
if !ok {
return []byte(``)
}
return schema.JsonSchema
}
func (s *Streamdal) setSchema(_ context.Context, aud *protos.Audience, schema []byte) {
s.schemasMtx.Lock()
defer s.schemasMtx.Unlock()
s.schemas[audToStr(aud)] = &protos.Schema{
JsonSchema: schema,
}
}
// handleSchema will handle the schema step in the pipeline, if necessary
func (s *Streamdal) handleSchema(ctx context.Context, aud *protos.Audience, step *protos.PipelineStep, resp *protos.WASMResponse) bool {
inferSchema := step.GetInferSchema()
if inferSchema == nil {
// nothing to do
return false
}
if resp.ExitCode != protos.WASMExitCode_WASM_EXIT_CODE_TRUE {
return false
}
// Get existing schema for audience
existingSchema := s.getSchema(ctx, aud)
if string(resp.OutputStep) == string(existingSchema) {
// Schema matches what we have in memory, nothing to do
return false
}
// Schema is new or modified, update in memory and send to the server
s.setSchema(ctx, aud, resp.OutputStep)
go func() {
err := s.serverClient.SendSchema(ctx, aud, resp.OutputStep)
if err != nil {
s.config.Logger.Errorf("failed to send schema: %s", err)
}
s.config.Logger.Debugf("published schema for audience '%s'", audToStr(aud))
}()
return true
}