/
schemas.go
121 lines (97 loc) · 2.41 KB
/
schemas.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package index
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"time"
"github.com/ttab/elephant-api/repository"
"github.com/ttab/elephantine"
"github.com/ttab/revisor"
"golang.org/x/exp/maps"
)
type SchemaLoader struct {
logger *slog.Logger
client repository.Schemas
m sync.RWMutex
v *revisor.Validator
knownVersions map[string]string
}
func NewSchemaLoader(
ctx context.Context,
logger *slog.Logger,
client repository.Schemas,
) (*SchemaLoader, error) {
sl := SchemaLoader{
logger: logger,
client: client,
}
err := sl.loadSchemas(ctx)
if err != nil {
return nil, fmt.Errorf("initial schema load: %w", err)
}
go sl.loadLoop(ctx)
return &sl, nil
}
func (sl *SchemaLoader) GetValidator() *revisor.Validator {
sl.m.RLock()
defer sl.m.RUnlock()
return sl.v
}
func (sl *SchemaLoader) loadLoop(ctx context.Context) {
for {
t := time.Now()
err := sl.loadSchemas(ctx)
if err != nil {
sl.logger.ErrorContext(ctx, "failed to load schemas",
elephantine.LogKeyError, err)
}
select {
case <-ctx.Done():
return
case <-time.After(time.Until(t.Add(5 * time.Second))):
// This is done to ensure that we don't start spamming
// the server in case something goes awry. During normal
// operations we should land naturally at 1req/10sec
// unless the schemas are being constantly updated.
continue
}
}
}
func (sl *SchemaLoader) loadSchemas(ctx context.Context) error {
results, err := sl.client.GetAllActive(ctx, &repository.GetAllActiveSchemasRequest{
Known: sl.knownVersions,
})
if err != nil {
return fmt.Errorf("get active schemas from repository: %w", err)
}
newKnown := make(map[string]string)
for _, schema := range results.Schemas {
newKnown[schema.Name] = schema.Version
}
if maps.Equal(sl.knownVersions, newKnown) {
return nil
}
var constraints []revisor.ConstraintSet
for _, schema := range results.Schemas {
var spec revisor.ConstraintSet
err := json.Unmarshal([]byte(schema.Spec), &spec)
if err != nil {
return fmt.Errorf(
"invalid schema spec for %s@%s returned by server: %w",
schema.Name, schema.Version, err)
}
constraints = append(constraints, spec)
}
validator, err := revisor.NewValidator(constraints...)
if err != nil {
return fmt.Errorf(
"create validator from repository schemas: %w", err)
}
sl.m.Lock()
sl.knownVersions = newKnown
sl.v = validator
sl.m.Unlock()
return nil
}