-
Notifications
You must be signed in to change notification settings - Fork 751
/
modules.go
139 lines (111 loc) · 2.75 KB
/
modules.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: hello@weaviate.io
//
package modulestorage
import (
"fmt"
"os"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/entities/moduletools"
bolt "go.etcd.io/bbolt"
)
type Repo struct {
logger logrus.FieldLogger
baseDir string
db *bolt.DB
}
func NewRepo(baseDir string, logger logrus.FieldLogger) (*Repo, error) {
r := &Repo{
baseDir: baseDir,
logger: logger,
}
err := r.init()
return r, err
}
func (r *Repo) DBPath() string {
return fmt.Sprintf("%s/modules.db", r.baseDir)
}
func (r *Repo) DataPath() string {
return r.baseDir
}
func (r *Repo) init() error {
if err := os.MkdirAll(r.baseDir, 0o777); err != nil {
return errors.Wrapf(err, "create root path directory at %s", r.baseDir)
}
boltdb, err := bolt.Open(r.DBPath(), 0o600, nil)
if err != nil {
return errors.Wrapf(err, "open bolt at %s", r.DBPath())
}
r.db = boltdb
return nil
}
type storageBucket struct {
bucketKey []byte
repo *Repo
}
func (r *Repo) Storage(bucketName string) (moduletools.Storage, error) {
storage := &storageBucket{
bucketKey: []byte(bucketName),
repo: r,
}
err := storage.init()
return storage, err
}
func (s *storageBucket) init() error {
return s.repo.db.Update(func(tx *bolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists(s.bucketKey); err != nil {
return errors.Wrapf(err, "create module storage bucket '%s'",
string(s.bucketKey))
}
return nil
})
}
func (s *storageBucket) Put(key, value []byte) error {
return s.repo.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketKey)
if b == nil {
return errors.Errorf("no bucket for key %s found", string(s.bucketKey))
}
if err := b.Put(key, value); err != nil {
return errors.Wrapf(err, "put value for key %s", string(key))
}
return nil
})
}
func (s *storageBucket) Get(key []byte) ([]byte, error) {
var out []byte
err := s.repo.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketKey)
if b == nil {
return errors.Errorf("no bucket for key %s found", string(s.bucketKey))
}
out = b.Get(key)
return nil
})
return out, err
}
func (s *storageBucket) Scan(scan moduletools.ScanFn) error {
err := s.repo.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketKey)
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
ok, err := scan(k, v)
if err != nil {
return errors.Wrapf(err, "read item %q", string(k))
}
if !ok {
break
}
}
return nil
})
return err
}