/
events.go
153 lines (125 loc) · 4.07 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package events
import (
"github.com/oklog/ulid/v2"
api "github.com/rotationalio/ensign/pkg/ensign/api/v1beta1"
"github.com/rotationalio/ensign/pkg/ensign/config"
"github.com/rotationalio/ensign/pkg/ensign/rlid"
"github.com/rotationalio/ensign/pkg/ensign/store/errors"
"github.com/rotationalio/ensign/pkg/ensign/store/iterator"
"github.com/rotationalio/ensign/pkg/utils/ulids"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
"google.golang.org/protobuf/proto"
)
func Open(conf config.StorageConfig) (store *Store, err error) {
store = &Store{
readonly: conf.ReadOnly,
}
var path string
if path, err = conf.EventPath(); err != nil {
return nil, err
}
if store.db, err = leveldb.OpenFile(path, &opt.Options{ReadOnly: conf.ReadOnly}); err != nil {
return nil, err
}
return store, nil
}
type Store struct {
db *leveldb.DB
readonly bool
}
func (s *Store) Close() error {
return s.db.Close()
}
func (s *Store) ReadOnly() bool {
return s.readonly
}
// Insert an event with the event segment into the database. If the event doesn't have
// an ID or a TopicID, an error is returned. This method also ensures that the localID
// is not stored and is nil. No other validation is performed by the database as this
// method is designed to write as quickly as possible.
func (s *Store) Insert(event *api.EventWrapper) (err error) {
if s.readonly {
return errors.ErrReadOnly
}
// The localID should not be stored in the database
event.LocalId = nil
var key Key
if key, err = EventKey(event); err != nil {
return err
}
var value []byte
if value, err = proto.Marshal(event); err != nil {
return errors.Wrap(err)
}
// Write to the database with fsync to avoid data loss
if err = s.db.Put(key[:], value, &opt.WriteOptions{Sync: true}); err != nil {
return errors.Wrap(err)
}
return nil
}
// Returns an iterator of events in the specified topic. If an offset RLID is specified
func (s *Store) List(topicID ulid.ULID) iterator.EventIterator {
if ulids.IsZero(topicID) {
return &EventErrorIterator{ErrorIterator: errors.NewIter(errors.ErrKeyNull)}
}
// Iterate over all of the events prefixed by the topicID and the event segment
prefix := make([]byte, 18)
topicID.MarshalBinaryTo(prefix[:16])
copy(prefix[16:18], EventSegment[:])
slice := util.BytesPrefix(prefix)
iter := s.db.NewIterator(slice, nil)
return &EventIterator{Iterator: iter, topicID: topicID}
}
// Retrieve a specific event from the database by topic and eventID.
func (s *Store) Retrieve(topicId ulid.ULID, eventID rlid.RLID) (event *api.EventWrapper, err error) {
var key Key
if key, err = CreateKey(topicId, eventID, EventSegment); err != nil {
return nil, err
}
var data []byte
if data, err = s.db.Get(key[:], nil); err != nil {
return nil, errors.Wrap(err)
}
event = &api.EventWrapper{}
if err = proto.Unmarshal(data, event); err != nil {
return nil, err
}
return event, nil
}
// Destroy all events, meta-events, and index hashes of the specified topic.
// NOTE: this will destroy anything in the database that is prefixed with the topicID.
func (s *Store) Destroy(topicID ulid.ULID) (err error) {
if s.readonly {
return errors.ErrReadOnly
}
if ulids.IsZero(topicID) {
return errors.ErrKeyNull
}
// Iterate over all objects prefixed by the topicID.
prefix := util.BytesPrefix(topicID.Bytes())
iter := s.db.NewIterator(prefix, &opt.ReadOptions{DontFillCache: true})
defer iter.Release()
batch := &leveldb.Batch{}
for iter.Next() {
batch.Delete(iter.Key())
}
if err = iter.Error(); err != nil {
return err
}
if err = s.db.Write(batch, &opt.WriteOptions{Sync: false, NoWriteMerge: true}); err != nil {
return err
}
return nil
}
// Count the number of objects that match the specified range by iterating through all
// of the keys and counting them. This is primarily used for testing.
func (s *Store) Count(slice *util.Range) (count uint64, err error) {
iter := s.db.NewIterator(slice, &opt.ReadOptions{DontFillCache: true})
defer iter.Release()
for iter.Next() {
count++
}
return count, iter.Error()
}