-
Notifications
You must be signed in to change notification settings - Fork 1
/
repository.go
194 lines (162 loc) · 3.75 KB
/
repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package repository
import (
"log"
"os"
"path/filepath"
"github.com/dgraph-io/badger/v2"
jsoniter "github.com/json-iterator/go"
"github.com/nsip/dev-nrt/sec"
)
//
// Allow definition of query at call-site by
// abstracting to function
//
type BadgerQueryFunc func(txn *badger.Txn) error
//
// keep totals for objects added to db
//
type ObjectStats map[string]int
//
// Data repositoryusing badger kv db engine
//
type BadgerRepo struct {
db *badger.DB
wb *badger.WriteBatch
}
//
// accessor for stored object stats in the repo itself
//
const STATS_KEY = "REPO-STATS"
//
// initiaise for marshal/unmarshal json
//
var json = jsoniter.ConfigFastest
//
// create a new badger repo in the nominated directory.
//
func NewBadgerRepo(dbFolderName string) (*BadgerRepo, error) {
// remove any existing dbs
err := os.RemoveAll(filepath.Dir(dbFolderName))
if err != nil {
return nil, err
}
// recreate the working directory
err = os.MkdirAll(filepath.Dir(dbFolderName), os.ModePerm)
if err != nil {
return nil, err
}
// create new badger instance
opts := badger.DefaultOptions(dbFolderName)
opts = opts.WithLoggingLevel(badger.WARNING)
// db, err := badger.Open(badger.DefaultOptions(dbFolderName))
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
// create a (fast) writebatch on the db
wb := db.NewWriteBatch()
return &BadgerRepo{db: db, wb: wb}, nil
}
//
// open a repo that's already got data in it
//
func OpenExistingBadgerRepo(dbFolderName string) (*BadgerRepo, error) {
// create new badger instance
opts := badger.DefaultOptions(dbFolderName)
opts = opts.WithLoggingLevel(badger.WARNING)
// db, err := badger.Open(badger.DefaultOptions(dbFolderName))
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
// create a (fast) writebatch on the db
wb := db.NewWriteBatch()
return &BadgerRepo{db: db, wb: wb}, nil
}
//
// access to underlying data store for iterator queries
//
func (br *BadgerRepo) DB() *badger.DB {
return br.db
}
//
// ensure all writes to disk are completed
//
func (br *BadgerRepo) Close() {
br.wb.Flush()
br.db.Close()
}
//
// on small data files write-batch is so fast it doesn't
// commit prior to reporting starting, this method can be used
// to force a flush of the current write-batch
//
func (br *BadgerRepo) Commit() {
br.wb.Flush()
br.wb = br.db.NewWriteBatch()
}
//
// saves data into the badger db, uses the indexfunc to
// create the lookup key for the data item
//
func (br *BadgerRepo) Store(r sec.Result, idxf IndexFunc) error {
// generate the key
key, err := idxf(r)
if err != nil {
return err
}
// add to the writebatch to store
return br.wb.Set(key, r.Json)
}
//
// persists a copy of the recorded object counts back into
// the repository, particularly used for sizing progress
// bars etc. when reports are run against the repo with
// no prior ingest phase.
//
func (br *BadgerRepo) SaveStats(s ObjectStats) error {
//
// convert to json
//
jsonStats, err := json.Marshal(&s)
if err != nil {
return err
}
//
// store in db
//
err = br.db.Update(func(txn *badger.Txn) error {
err := txn.Set([]byte(STATS_KEY), jsonStats)
return err
})
return err
}
//
// returns the last reocrded set of ingest object counts
// for this repo
//
func (br *BadgerRepo) GetStats() ObjectStats {
var statsBytes []byte
err := br.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(STATS_KEY))
if err != nil {
return err
}
statsBytes, err = item.ValueCopy(nil)
if err != nil {
return err
}
return nil
})
if err != nil {
// no current recorded stats
return ObjectStats{}
}
var objst ObjectStats
err = json.Unmarshal(statsBytes, &objst)
if err != nil {
log.Println("error unmarshalling stats from db:", err)
return ObjectStats{}
}
return objst
}