-
Notifications
You must be signed in to change notification settings - Fork 24
/
checkpoint.go
134 lines (114 loc) · 3.3 KB
/
checkpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
Copyright 2020 VMware, Inc.
SPDX-License-Identifier: Apache-2.0
*/
package vsphere
import (
"encoding/json"
"errors"
"time"
)
const (
// replay history from this time by default
CheckpointDefaultAge = 5 * time.Minute
// create checkpoint every frequency but only on changes
CheckpointDefaultPeriod = 10 * time.Second
// key name used in KV store for storing the latest checkpoint
checkpointKey = "checkpoint"
)
var (
ErrInvalidInterval = errors.New("invalid checkpoint time interval")
)
// checkpoint represents a vCenter checkpoint object
type checkpoint struct {
VCenter string `json:"vCenter"`
// last vCenter event key successfully processed
LastEventKey int32 `json:"lastEventKey"`
// last event type, e.g. VmPoweredOffEvent useful for debugging
LastEventType string `json:"lastEventType"`
// last vCenter event key timestamp (UTC) successfully processed - used as
// starting point for vCenter stream
LastEventKeyTimestamp time.Time `json:"lastEventKeyTimestamp"`
// timestamp (UTC) when this checkpoint was created
CreatedTimestamp time.Time `json:"createdTimestamp"`
}
// CheckpointConfig influences the checkpoint behavior. It configures the
// maximum age of the replay (look-back) window when starting the event stream
// and the period of saving the checkpoint
type CheckpointConfig struct {
// max replay window
MaxAge time.Duration `json:"maxAge"`
// create checkpoints at given frequency
Period time.Duration `json:"period"`
}
// MarshalJSON defines custom marshalling logic to support human-readable time
// input on the checkpoint configuration, e.g. "10m" or "1h".
func (c *CheckpointConfig) MarshalJSON() ([]byte, error) {
var out struct {
MaxAge string `json:"maxAge"`
Period string `json:"period"`
}
if c.MaxAge < time.Duration(0) {
return nil, ErrInvalidInterval
}
if c.Period < time.Duration(0) {
return nil, ErrInvalidInterval
}
out.MaxAge = c.MaxAge.String()
out.Period = c.Period.String()
return json.Marshal(out)
}
// UnmarshalJSON defines custom marshalling logic to support human-readable time
// input on the checkpoint configuration, e.g. "10m" or "1h". Using numbers
// without time suffix as input will fail encoding/decoding.
func (c *CheckpointConfig) UnmarshalJSON(b []byte) error {
var in struct {
MaxAge string `json:"maxAge"`
Period string `json:"period"`
}
var (
v time.Duration
err error
)
if err = json.Unmarshal(b, &in); err != nil {
return err
}
if in.MaxAge == "" {
v = CheckpointDefaultAge
} else {
v, err = time.ParseDuration(in.MaxAge)
if err != nil {
return err
}
if v < time.Duration(0) {
return ErrInvalidInterval
}
}
c.MaxAge = v
if in.Period == "" {
v = CheckpointDefaultPeriod
} else {
v, err = time.ParseDuration(in.Period)
if err != nil {
return err
}
if v < time.Duration(0) {
return ErrInvalidInterval
}
}
c.Period = v
return nil
}
// newCheckpointConfig returns a checkpointConfig for the given JSON-encoded
// string. If the config is empty defaults for the event history replay window
// and frequency of saving the checkpoint will be used.
func newCheckpointConfig(config string) (*CheckpointConfig, error) {
var c CheckpointConfig
if err := json.Unmarshal([]byte(config), &c); err != nil {
return nil, err
}
if c.Period == 0 {
c.Period = CheckpointDefaultPeriod
}
return &c, nil
}