/
splits.go
134 lines (111 loc) · 3.28 KB
/
splits.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package persistent
import (
"bytes"
"encoding/gob"
"encoding/json"
"sync"
"github.com/splitio/go-split-commons/v5/dtos"
"github.com/splitio/go-toolkit/v5/logging"
)
const splitChangesCollectionName = "SPLIT_CHANGES_COLLECTION"
// SplitChangesItem represents an SplitChanges service response
type SplitChangesItem struct {
ChangeNumber int64 `json:"changeNumber"`
Name string `json:"name"`
Status string `json:"status"`
JSON string
}
// SplitsChangesItems Sortable list
type SplitsChangesItems []SplitChangesItem
func (slice SplitsChangesItems) Len() int {
return len(slice)
}
func (slice SplitsChangesItems) Less(i, j int) bool {
return slice[i].ChangeNumber > slice[j].ChangeNumber
}
func (slice SplitsChangesItems) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}
//----------------------------------------------------
// SplitChangesCollection represents a collection of SplitChangesItem
type SplitChangesCollection struct {
collection CollectionWrapper
changeNumber int64
mutex sync.RWMutex
}
// NewSplitChangesCollection returns an instance of SplitChangesCollection
func NewSplitChangesCollection(db DBWrapper, logger logging.LoggerInterface) *SplitChangesCollection {
return &SplitChangesCollection{
collection: &BoltDBCollectionWrapper{db: db, name: splitChangesCollectionName, logger: logger},
changeNumber: 0,
}
}
// Update processes a set of feature flag changes items + a changeNumber bump atomically
func (c *SplitChangesCollection) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, cn int64) {
items := make(SplitsChangesItems, 0, len(toAdd)+len(toRemove))
process := func(split *dtos.SplitDTO) {
asJSON, err := json.Marshal(split)
if err != nil {
// This should not happen unless the DTO class is broken
return
}
items = append(items, SplitChangesItem{
ChangeNumber: split.ChangeNumber,
Name: split.Name,
Status: split.Status,
JSON: string(asJSON),
})
}
for _, split := range toAdd {
process(&split)
}
for _, split := range toRemove {
process(&split)
}
c.mutex.Lock()
defer c.mutex.Unlock()
for idx := range items {
err := c.collection.SaveAs([]byte(items[idx].Name), items[idx])
if err != nil {
// TODO(mredolatti): log
}
}
c.changeNumber = cn
}
// FetchAll return a SplitChangesItem
func (c *SplitChangesCollection) FetchAll() ([]dtos.SplitDTO, error) {
c.mutex.RLock()
defer c.mutex.RUnlock()
items, err := c.collection.FetchAll()
if err != nil {
return nil, err
}
toReturn := make([]dtos.SplitDTO, 0)
var decodeBuffer bytes.Buffer
for _, v := range items {
var q SplitChangesItem
// resets buffer data
decodeBuffer.Reset()
decodeBuffer.Write(v)
dec := gob.NewDecoder(&decodeBuffer)
errq := dec.Decode(&q)
if errq != nil {
c.collection.Logger().Error("decode error:", errq, "|", string(v))
continue
}
var parsed dtos.SplitDTO
err := json.Unmarshal([]byte(q.JSON), &parsed)
if err != nil {
c.collection.Logger().Error("error decoding feature flag fetched from db: ", err, "|", q.JSON)
continue
}
toReturn = append(toReturn, parsed)
}
return toReturn, nil
}
// ChangeNumber returns changeNumber
func (c *SplitChangesCollection) ChangeNumber() int64 {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.changeNumber
}