-
-
Notifications
You must be signed in to change notification settings - Fork 17
/
store.go
127 lines (110 loc) · 3.23 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package cqldb
import (
"context"
"errors"
"fmt"
"strings"
"github.com/plgd-dev/hub/v2/pkg/cqldb"
"github.com/plgd-dev/hub/v2/pkg/fsnotify"
"github.com/plgd-dev/hub/v2/pkg/log"
"github.com/plgd-dev/hub/v2/pkg/security/certManager/client"
"go.opentelemetry.io/otel/trace"
)
// Document
const (
// cqldb has all keys in lowercase
idKey = "id"
ownerKey = "owner"
deviceIDKey = "deviceid"
commonNameKey = "commonname"
dataKey = "data"
)
type Index struct {
Name string
PartitionKey string
SecondaryColumn string
}
// partition key: idKey
// clustering key: deviceIDKey
var primaryKey = []string{idKey, ownerKey, commonNameKey}
// Store implements an Store for cqldb.
type Store struct {
client *cqldb.Client
config *Config
logger log.Logger
}
func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) {
certManager, err := client.New(config.Embedded.TLS, fileWatcher, logger)
if err != nil {
return nil, fmt.Errorf("could not create cert manager: %w", err)
}
cqldbClient, err := cqldb.New(ctx, config.Embedded, certManager.GetTLSConfig(), logger, tracerProvider)
if err != nil {
certManager.Close()
return nil, err
}
store, err := newEventStoreWithClient(ctx, cqldbClient, config, logger)
if err != nil {
cqldbClient.Close()
certManager.Close()
return nil, err
}
store.AddCloseFunc(certManager.Close)
return store, nil
}
func createEventsTable(ctx context.Context, client *cqldb.Client, table string) error {
q := "create table if not exists " + client.Keyspace() + "." + table + " (" +
idKey + " " + cqldb.UUIDType + "," +
ownerKey + " " + cqldb.StringType + "," +
deviceIDKey + " " + cqldb.UUIDType + "," +
commonNameKey + " " + cqldb.StringType + "," +
dataKey + " " + cqldb.BytesType + "," +
"primary key (" + strings.Join(primaryKey, ",") + ")" +
")"
err := client.Session().Query(q).WithContext(ctx).Exec()
if err != nil {
return fmt.Errorf("failed to create table(%v): %w", table, err)
}
return nil
}
// NewEventStoreWithClient creates a new Store with a session.
func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config *Config, logger log.Logger) (*Store, error) {
if client == nil {
return nil, errors.New("invalid client")
}
if config.Table == "" {
config.Table = "signedCertificateRecords"
}
err := createEventsTable(ctx, client, config.Table)
if err != nil {
return nil, err
}
return &Store{
client: client,
logger: logger,
config: config,
}, nil
}
// Clear clears the event storage.
func (s *Store) Clear(ctx context.Context) error {
err := s.client.DropKeyspace(ctx)
if err != nil {
return fmt.Errorf("cannot clear: %w", err)
}
return nil
}
func (s *Store) Table() string {
return s.client.Keyspace() + "." + s.config.Table
}
// Clear documents in collections, but don't drop the database or the collections
func (s *Store) ClearTable(ctx context.Context) error {
return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec()
}
// Close closes the database session.
func (s *Store) Close(_ context.Context) error {
s.client.Close()
return nil
}
func (s *Store) AddCloseFunc(f func()) {
s.client.AddCloseFunc(f)
}