-
Notifications
You must be signed in to change notification settings - Fork 39
/
cassandra.go
241 lines (204 loc) · 6.4 KB
/
cassandra.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package cassandra
import (
"context"
_ "embed" // used to print the embedded assets
"fmt"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
"github.com/gocql/gocql"
"github.com/raystack/meteor/models"
v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
"github.com/raystack/meteor/plugins/sqlutil"
"github.com/raystack/meteor/plugins"
"github.com/raystack/meteor/registry"
"github.com/raystack/salt/log"
)
//go:embed README.md
var summary string
// defaultKeyspaceList is the list of keyspaces to be excluded
var defaultKeyspaceList = []string{
"system",
"system_schema",
"system_auth",
"system_distributed",
"system_traces",
}
const (
service = "cassandra"
)
// Config holds the set of configuration for the cassandra extractor
type Config struct {
UserID string `json:"user_id" yaml:"user_id" mapstructure:"user_id" validate:"required"`
Password string `json:"password" yaml:"password" mapstructure:"password" validate:"required"`
Host string `json:"host" yaml:"host" mapstructure:"host" validate:"required"`
Port int `json:"port" yaml:"port" mapstructure:"port" validate:"required"`
Exclude Exclude `json:"exclude" yaml:"exclude" mapstructure:"exclude"`
}
type Exclude struct {
Keyspaces []string `json:"keyspaces" yaml:"keyspaces" mapstructure:"keyspaces"`
Tables []string `json:"tables" yaml:"tables" mapstructure:"tables"`
}
var sampleConfig = `
user_id: admin
password: "1234"
host: localhost
port: 9042
`
var info = plugins.Info{
Description: "Table metadata from cassandra server.",
SampleConfig: sampleConfig,
Summary: summary,
Tags: []string{"oss", "extractor"},
}
// Extractor manages the extraction of data from cassandra
type Extractor struct {
plugins.BaseExtractor
excludedKeyspaces map[string]bool
excludeTables map[string]bool
logger log.Logger
config Config
session *gocql.Session
emit plugins.Emit
}
// New returns a pointer to an initialized Extractor Object
func New(logger log.Logger) *Extractor {
e := &Extractor{
logger: logger,
}
e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
return e
}
// Init initializes the extractor
func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error) {
if err = e.BaseExtractor.Init(ctx, config); err != nil {
return err
}
// build excluded database list
excludedKeyspacesList := append(defaultKeyspaceList, e.config.Exclude.Keyspaces...)
e.excludedKeyspaces = sqlutil.BuildBoolMap(excludedKeyspacesList)
e.excludeTables = sqlutil.BuildBoolMap(e.config.Exclude.Tables)
// connect to cassandra
cluster := gocql.NewCluster(e.config.Host)
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: e.config.UserID,
Password: e.config.Password,
}
cluster.Consistency = gocql.Quorum
cluster.ProtoVersion = 4
cluster.Port = e.config.Port
if e.session, err = cluster.CreateSession(); err != nil {
return errors.Wrap(err, "failed to create session")
}
return
}
// Extract checks if the extractor is configured and
// if the connection to the DB is successful
// and then starts the extraction process
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) {
defer e.session.Close()
e.emit = emit
scanner := e.session.
Query("SELECT keyspace_name FROM system_schema.keyspaces;").
Iter().
Scanner()
for scanner.Next() {
var keyspace string
if err = scanner.Scan(&keyspace); err != nil {
return errors.Wrapf(err, "failed to iterate over %s", keyspace)
}
// skip if database is default
if e.isExcludedKeyspace(keyspace) {
continue
}
if err = e.extractTables(keyspace); err != nil {
return errors.Wrapf(err, "failed to extract tables from %s", keyspace)
}
}
return
}
// extractTables extract tables from a given keyspace
func (e *Extractor) extractTables(keyspace string) (err error) {
scanner := e.session.
Query(`SELECT table_name FROM system_schema.tables WHERE keyspace_name = ?`, keyspace).
Iter().
Scanner()
for scanner.Next() {
var tableName string
if err = scanner.Scan(&tableName); err != nil {
return errors.Wrapf(err, "failed to iterate over %s", tableName)
}
if e.isExcludedTable(keyspace, tableName) {
continue
}
if err = e.processTable(keyspace, tableName); err != nil {
return errors.Wrap(err, "failed to process table")
}
}
return
}
// processTable build and push table to out channel
func (e *Extractor) processTable(keyspace string, tableName string) (err error) {
var columns []*v1beta2.Column
columns, err = e.extractColumns(keyspace, tableName)
if err != nil {
return errors.Wrap(err, "failed to extract columns")
}
table, err := anypb.New(&v1beta2.Table{
Columns: columns,
Attributes: &structpb.Struct{}, // ensure attributes don't get overwritten if present
})
if err != nil {
err = fmt.Errorf("error creating Any struct: %w", err)
}
// push table to channel
e.emit(models.NewRecord(&v1beta2.Asset{
Urn: models.NewURN(service, e.UrnScope, "table", fmt.Sprintf("%s.%s", keyspace, tableName)),
Name: tableName,
Service: service,
Data: table,
Type: "table",
}))
return
}
// extractColumns extract columns from a given table
func (e *Extractor) extractColumns(keyspace string, tableName string) (columns []*v1beta2.Column, err error) {
query := `SELECT column_name, type
FROM system_schema.columns
WHERE keyspace_name = ?
AND table_name = ?`
scanner := e.session.
Query(query, keyspace, tableName).
Iter().
Scanner()
for scanner.Next() {
var fieldName, dataType string
if err = scanner.Scan(&fieldName, &dataType); err != nil {
e.logger.Error("failed to get fields", "error", err)
continue
}
columns = append(columns, &v1beta2.Column{
Name: fieldName,
DataType: dataType,
})
}
return
}
// isExcludedKeyspace checks if the given db is in the list of excluded keyspaces
func (e *Extractor) isExcludedKeyspace(keyspace string) bool {
_, ok := e.excludedKeyspaces[keyspace]
return ok
}
func (e *Extractor) isExcludedTable(keyspace, table string) bool {
tableName := fmt.Sprintf("%s.%s", keyspace, table)
_, ok := e.excludeTables[tableName]
return ok
}
// init register the extractor to the catalog
func init() {
if err := registry.Extractors.Register("cassandra", func() plugins.Extractor {
return New(plugins.GetLog())
}); err != nil {
panic(err)
}
}