-
Notifications
You must be signed in to change notification settings - Fork 8
/
registry.go
137 lines (119 loc) · 5 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright 2022 Namespace Labs Inc; All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package execution
import (
"context"
"google.golang.org/protobuf/proto"
"namespacelabs.dev/foundation/internal/protos"
"namespacelabs.dev/foundation/schema"
"namespacelabs.dev/foundation/schema/orchestration"
)
type Funcs[M proto.Message] struct {
Aliases []string
EmitStart func(context.Context, *schema.SerializedInvocation, M, chan *orchestration.Event)
Handle func(context.Context, *schema.SerializedInvocation, M) (*HandleResult, error)
HandleWithEvents func(context.Context, *schema.SerializedInvocation, M, chan *orchestration.Event) (*HandleResult, error)
PlanOrder func(context.Context, M) (*schema.ScheduleOrder, error)
}
type VFuncs[M proto.Message, V any] struct {
Parse func(context.Context, *schema.SerializedInvocation, M) (V, error)
EmitStart func(context.Context, *schema.SerializedInvocation, V, chan *orchestration.Event)
Handle func(context.Context, *schema.SerializedInvocation, V) (*HandleResult, error)
HandleWithEvents func(context.Context, *schema.SerializedInvocation, V, chan *orchestration.Event) (*HandleResult, error)
PlanOrder func(context.Context, V) (*schema.ScheduleOrder, error)
}
type internalFuncs struct {
Aliases []string
Parse func(context.Context, *schema.SerializedInvocation, proto.Message) (any, error)
EmitStart func(context.Context, *schema.SerializedInvocation, proto.Message, any, chan *orchestration.Event)
Handle func(context.Context, *schema.SerializedInvocation, proto.Message, any, chan *orchestration.Event) (*HandleResult, error)
PlanOrder func(context.Context, proto.Message, any) (*schema.ScheduleOrder, error)
}
type compilerFunc func(context.Context, []*schema.SerializedInvocation) ([]*schema.SerializedInvocation, error)
type registration struct {
key string
unmarshal func(*schema.SerializedInvocation) (proto.Message, error)
funcs internalFuncs
}
var (
handlers = map[string]*registration{}
compilers = map[string]compilerFunc{}
)
func RegisterHandlerFunc[M proto.Message](handle func(context.Context, *schema.SerializedInvocation, M) (*HandleResult, error)) {
register[M](internalFuncs{
Handle: func(ctx context.Context, def *schema.SerializedInvocation, msg proto.Message, _ any, _ chan *orchestration.Event) (*HandleResult, error) {
return handle(ctx, def, msg.(M))
},
PlanOrder: func(context.Context, proto.Message, any) (*schema.ScheduleOrder, error) {
return nil, nil
},
})
}
func RegisterFuncs[M proto.Message](funcs Funcs[M]) {
register[M](internalFuncs{
Aliases: funcs.Aliases,
EmitStart: func(ctx context.Context, inv *schema.SerializedInvocation, msg proto.Message, _ any, ch chan *orchestration.Event) {
if funcs.EmitStart != nil {
funcs.EmitStart(ctx, inv, msg.(M), ch)
}
},
Handle: func(ctx context.Context, inv *schema.SerializedInvocation, msg proto.Message, _ any, ch chan *orchestration.Event) (*HandleResult, error) {
if funcs.HandleWithEvents != nil {
return funcs.HandleWithEvents(ctx, inv, msg.(M), ch)
}
return funcs.Handle(ctx, inv, msg.(M))
},
PlanOrder: func(ctx context.Context, msg proto.Message, _ any) (*schema.ScheduleOrder, error) {
if funcs.PlanOrder == nil {
return nil, nil
}
return funcs.PlanOrder(ctx, msg.(M))
},
})
}
func RegisterVFuncs[M proto.Message, V any](funcs VFuncs[M, V]) {
register[M](internalFuncs{
Parse: func(ctx context.Context, def *schema.SerializedInvocation, msg proto.Message) (any, error) {
return funcs.Parse(ctx, def, msg.(M))
},
EmitStart: func(ctx context.Context, inv *schema.SerializedInvocation, msg proto.Message, value any, ch chan *orchestration.Event) {
if funcs.EmitStart != nil {
funcs.EmitStart(ctx, inv, value.(V), ch)
}
},
Handle: func(ctx context.Context, def *schema.SerializedInvocation, _ proto.Message, value any, ch chan *orchestration.Event) (*HandleResult, error) {
if funcs.HandleWithEvents != nil {
return funcs.HandleWithEvents(ctx, def, value.(V), ch)
}
return funcs.Handle(ctx, def, value.(V))
},
PlanOrder: func(ctx context.Context, _ proto.Message, value any) (*schema.ScheduleOrder, error) {
if funcs.PlanOrder == nil {
return nil, nil
}
return funcs.PlanOrder(ctx, value.(V))
},
})
}
func register[M proto.Message](funcs internalFuncs) {
keys := append([]string{protos.TypeUrl[M]()}, funcs.Aliases...)
for _, key := range keys {
reg := registration{
key: key,
unmarshal: func(si *schema.SerializedInvocation) (proto.Message, error) {
msg := protos.NewFromType[M]()
// Unmarshal from value directly as we tolerate type aliases.
if err := proto.Unmarshal(si.Impl.Value, msg); err != nil {
return nil, err
}
return msg, nil
},
funcs: funcs,
}
handlers[key] = ®
}
}
func Compile[M proto.Message](compiler compilerFunc) {
compilers[protos.TypeUrl[M]()] = compiler
}