This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
/
plugin_state_manager.go
81 lines (67 loc) · 2.01 KB
/
plugin_state_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package task
import (
"bytes"
"context"
"fmt"
"io"
"github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/codex"
)
type CodecVersion uint8
const (
GobCodecVersion CodecVersion = iota
)
const currentCodec = GobCodecVersion
// TODO Configurable?
const maxPluginStateSizeBytes = 256
type stateCodec interface {
Encode(interface{}, io.Writer) error
Decode(io.Reader, interface{}) error
}
type pluginStateManager struct {
prevState *bytes.Buffer
prevStateVersion uint8
newState *bytes.Buffer
newStateVersion uint8
codec stateCodec
codecVersion CodecVersion
}
func (p *pluginStateManager) Put(stateVersion uint8, v interface{}) error {
p.newStateVersion = stateVersion
if v != nil {
buf := make([]byte, 0, maxPluginStateSizeBytes)
p.newState = bytes.NewBuffer(buf)
return p.codec.Encode(v, p.newState)
}
return nil
}
func (p *pluginStateManager) Reset() error {
p.newState = nil
return nil
}
func (p pluginStateManager) GetStateVersion() uint8 {
return p.prevStateVersion
}
func (p pluginStateManager) Get(v interface{}) (stateVersion uint8, err error) {
if p.prevState == nil {
return p.prevStateVersion, nil
}
if v == nil {
return p.prevStateVersion, fmt.Errorf("cannot get state for a nil object, please initialize the type before requesting")
}
return p.prevStateVersion, p.codec.Decode(p.prevState, v)
}
func (p pluginStateManager) GetCodeVersion() CodecVersion {
return p.codecVersion
}
func newPluginStateManager(_ context.Context, prevCodecVersion CodecVersion, prevStateVersion uint32, prevState *bytes.Buffer) (*pluginStateManager, error) {
if prevCodecVersion != currentCodec {
return nil, errors.Errorf(errors.IllegalStateError, "x", "prev codec [%d] != current codec [%d]", prevCodecVersion, currentCodec)
}
return &pluginStateManager{
codec: codex.GobStateCodec{},
codecVersion: GobCodecVersion,
prevStateVersion: uint8(prevStateVersion),
prevState: prevState,
}, nil
}