/
watch.go
89 lines (78 loc) · 1.65 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package nimbusdb
import (
"time"
"github.com/segmentio/ksuid"
)
type EventType string
const (
Create EventType = "CREATE"
Update EventType = "UPDATE"
Delete EventType = "DELETE"
)
type WatcherEvent struct {
EventType EventType
Key []byte
OldValue []byte
NewValue []byte
EventTimestamp time.Time
BatchId *ksuid.KSUID
}
func (db *Db) NewWatch() (chan WatcherEvent, error) {
if db.closed {
return nil, ERROR_DB_CLOSED
}
return db.watcher, nil
}
func (db *Db) CloseWatch() error {
if db.closed {
return ERROR_DB_CLOSED
}
close(db.watcher)
return nil
}
func NewCreateWatcherEvent(key, oldValue, newValue []byte, batchId *ksuid.KSUID) WatcherEvent {
w := WatcherEvent{
EventType: Create,
Key: key,
NewValue: newValue,
EventTimestamp: time.Now(),
BatchId: batchId,
}
if oldValue != nil {
w.OldValue = oldValue
}
return w
}
func NewUpdateWatcherEvent(key, oldValue, newValue []byte, batchId *ksuid.KSUID) WatcherEvent {
w := WatcherEvent{
EventType: Update,
Key: key,
NewValue: newValue,
EventTimestamp: time.Now(),
BatchId: batchId,
}
if oldValue != nil {
w.OldValue = oldValue
}
return w
}
func NewDeleteWatcherEvent(key, oldValue, newValue []byte, batchId *ksuid.KSUID) WatcherEvent {
w := WatcherEvent{
EventType: Delete,
Key: key,
NewValue: newValue,
EventTimestamp: time.Now(),
BatchId: batchId,
}
if oldValue != nil {
w.OldValue = oldValue
}
return w
}
func (db *Db) SendWatchEvent(w WatcherEvent) error {
if db.closed {
return ERROR_DB_CLOSED
}
db.watcher <- w
return nil
}