-
Notifications
You must be signed in to change notification settings - Fork 348
/
metastore_client.go
318 lines (285 loc) · 9.81 KB
/
metastore_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
package glue
import (
"context"
"errors"
"fmt"
"regexp"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/glue"
"github.com/aws/aws-sdk-go/service/glue/glueiface"
"github.com/treeverse/lakefs/pkg/metastore"
mserrors "github.com/treeverse/lakefs/pkg/metastore/errors"
)
const MaxParts = 1000 // max possible 1000
type MSClient struct {
client glueiface.GlueAPI
catalogID string
baseLocationURI string
}
func (g *MSClient) GetDBLocation(dbName string) string {
return fmt.Sprintf("%s/%s", g.baseLocationURI, dbName)
}
func (g *MSClient) NormalizeDBName(db string) string {
return db
}
func NewMSClient(cfg *aws.Config, catalogID, baselLocationURI string) (metastore.Client, error) {
sess := session.Must(session.NewSession(cfg))
sess.ClientConfig("glue")
gl := glue.New(sess)
return &MSClient{
client: gl,
catalogID: catalogID,
baseLocationURI: strings.TrimRight(baselLocationURI, "/"),
}, nil
}
func (g *MSClient) HasTable(ctx context.Context, dbName string, tableName string) (bool, error) {
table, err := g.GetTable(ctx, dbName, tableName)
var noSuchObjectErr *glue.EntityNotFoundException // TODO(Guys): validate this one
if err != nil && !errors.As(err, &noSuchObjectErr) {
return false, err
}
return table != nil, nil
}
func (g *MSClient) GetDatabase(ctx context.Context, name string) (*metastore.Database, error) {
db, err := g.client.GetDatabaseWithContext(ctx, &glue.GetDatabaseInput{
CatalogId: aws.String(g.catalogID),
Name: aws.String(name),
})
if err != nil {
return nil, err
}
return DatabaseGlueToLocal(db.Database), nil
}
func (g *MSClient) getDatabaseFromGlue(ctx context.Context, token *string, parts int) (*glue.GetDatabasesOutput, error) {
return g.client.GetDatabasesWithContext(ctx, &glue.GetDatabasesInput{
CatalogId: aws.String(g.catalogID),
MaxResults: aws.Int64(int64(parts)),
NextToken: token,
ResourceShareType: nil,
})
}
func (g *MSClient) GetDatabases(ctx context.Context, pattern string) ([]*metastore.Database, error) {
var nextToken *string
var allDatabases []*metastore.Database
for {
getDatabasesOutput, err := g.getDatabaseFromGlue(ctx, nextToken, MaxParts)
if err != nil {
return nil, err
}
nextToken = getDatabasesOutput.NextToken
filteredDatabases, err := filterDatabases(getDatabasesOutput.DatabaseList, pattern)
if err != nil {
return nil, err
}
databases := DatabasesGlueToLocal(filteredDatabases)
allDatabases = append(allDatabases, databases...)
if nextToken == nil {
break
}
}
return allDatabases, nil
}
func filterDatabases(databases []*glue.Database, pattern string) ([]*glue.Database, error) {
if pattern == "" {
return databases, nil
}
r, err := regexp.Compile(pattern)
if err != nil {
return nil, err
}
res := make([]*glue.Database, 0)
for _, database := range databases {
if r.MatchString(aws.StringValue(database.Name)) {
res = append(res, database)
}
}
return res, nil
}
func (g *MSClient) GetTables(ctx context.Context, dbName string, pattern string) ([]*metastore.Table, error) {
var nextToken *string
allTables := make([]*metastore.Table, 0)
for {
getTablesOutput, err := g.client.GetTablesWithContext(ctx, &glue.GetTablesInput{
CatalogId: aws.String(g.catalogID),
DatabaseName: aws.String(dbName),
Expression: aws.String(pattern),
MaxResults: aws.Int64(MaxParts),
NextToken: nextToken,
})
if err != nil {
return nil, err
}
nextToken = getTablesOutput.NextToken
tables := TablesGlueToLocal(getTablesOutput.TableList)
allTables = append(allTables, tables...)
if nextToken == nil {
break
}
}
return allTables, nil
}
func (g *MSClient) AlterTable(ctx context.Context, dbName string, _ string, newTable *metastore.Table) error {
table := TableLocalToGlue(newTable)
_, err := g.client.UpdateTableWithContext(ctx, &glue.UpdateTableInput{
CatalogId: aws.String(g.catalogID),
DatabaseName: aws.String(dbName),
SkipArchive: aws.Bool(false), // UpdateTable always creates an archived version of the table before updating it. However, if skipArchive is set to true, UpdateTable does not create the archived version.
TableInput: table,
})
return err
}
func (g *MSClient) DropPartition(ctx context.Context, dbName string, tableName string, values []string) error {
_, err := g.client.DeletePartitionWithContext(ctx, &glue.DeletePartitionInput{
CatalogId: aws.String(g.catalogID),
DatabaseName: aws.String(dbName),
PartitionValues: aws.StringSlice(values),
TableName: aws.String(tableName),
})
return err
}
func (g *MSClient) CreateDatabase(ctx context.Context, database *metastore.Database) error {
databaseInput := DatabaseLocalToGlue(database)
_, err := g.client.CreateDatabaseWithContext(ctx, &glue.CreateDatabaseInput{
CatalogId: aws.String(g.catalogID),
DatabaseInput: databaseInput,
})
var ErrExists *glue.AlreadyExistsException
if errors.As(err, &ErrExists) {
return mserrors.ErrSchemaExists
}
return err
}
func (g *MSClient) getTableData(ctx context.Context, dbName string, tblName string) (*glue.TableData, error) {
table, err := g.client.GetTableWithContext(ctx,
&glue.GetTableInput{
CatalogId: aws.String(g.catalogID),
DatabaseName: aws.String(dbName),
Name: aws.String(tblName),
})
if err != nil {
return nil, err
}
return table.Table, nil
}
func (g *MSClient) GetTable(ctx context.Context, dbName string, tableName string) (*metastore.Table, error) {
table, err := g.getTableData(ctx, dbName, tableName)
if err != nil {
return nil, err
}
return TableGlueToLocal(table), nil
}
func (g *MSClient) CreateTable(ctx context.Context, tbl *metastore.Table) error {
table := TableLocalToGlue(tbl)
dbName := tbl.DBName
_, err := g.client.CreateTableWithContext(ctx,
&glue.CreateTableInput{
CatalogId: aws.String(g.catalogID),
DatabaseName: aws.String(dbName),
TableInput: table,
})
return err
}
func (g *MSClient) GetPartition(ctx context.Context, dbName string, tableName string, values []string) (*metastore.Partition, error) {
output, err := g.client.GetPartitionWithContext(ctx,
&glue.GetPartitionInput{
CatalogId: aws.String(g.catalogID),
DatabaseName: aws.String(dbName),
PartitionValues: aws.StringSlice(values),
TableName: aws.String(tableName),
})
if err != nil {
return nil, err
}
return PartitionGlueToLocal(output.Partition), nil
}
func (g *MSClient) GetPartitions(ctx context.Context, dbName string, tableName string) ([]*metastore.Partition, error) {
partitions, err := g.GetAllPartitions(ctx, dbName, tableName)
if err != nil {
return nil, err
}
return PartitionsGlueToLocal(partitions), nil
}
func (g *MSClient) getPartitionsFromGlue(ctx context.Context, dbName, tableName string, nextToken *string, maxParts int16) (*glue.GetPartitionsOutput, error) {
return g.client.GetPartitionsWithContext(ctx,
&glue.GetPartitionsInput{
CatalogId: aws.String(g.catalogID),
DatabaseName: aws.String(dbName),
MaxResults: aws.Int64(int64(maxParts)),
NextToken: nextToken,
TableName: aws.String(tableName),
})
}
func (g *MSClient) GetAllPartitions(ctx context.Context, dbName, tableName string) ([]*glue.Partition, error) {
var nextToken *string
var allPartitions []*glue.Partition
for {
getPartitionsOutput, err := g.getPartitionsFromGlue(ctx, dbName, tableName, nextToken, MaxParts)
if err != nil {
return nil, err
}
nextToken = getPartitionsOutput.NextToken
partitions := getPartitionsOutput.Partitions
allPartitions = append(allPartitions, partitions...)
if nextToken == nil {
break
}
}
return allPartitions, nil
}
func (g *MSClient) AddPartition(ctx context.Context, tableName string, dbName string, newPartition *metastore.Partition) error {
gluePartition := PartitionLocalToGlue(newPartition)
_, err := g.client.CreatePartitionWithContext(ctx,
&glue.CreatePartitionInput{
CatalogId: aws.String(g.catalogID),
DatabaseName: aws.String(dbName),
PartitionInput: gluePartition,
TableName: aws.String(tableName),
})
return err
}
func (g *MSClient) AddPartitions(ctx context.Context, tableName string, dbName string, newParts []*metastore.Partition) error {
gluePartitions := PartitionsLocalToGlue(newParts)
partitionList := make([]*glue.PartitionInput, 0, len(gluePartitions))
for _, partition := range gluePartitions {
partitionList = append(partitionList, &glue.PartitionInput{
LastAccessTime: partition.LastAccessTime,
LastAnalyzedTime: partition.LastAnalyzedTime,
Parameters: partition.Parameters,
StorageDescriptor: partition.StorageDescriptor,
Values: partition.Values,
})
}
_, err := g.client.BatchCreatePartitionWithContext(ctx,
&glue.BatchCreatePartitionInput{
CatalogId: aws.String(g.catalogID),
DatabaseName: aws.String(dbName),
PartitionInputList: partitionList,
TableName: aws.String(tableName),
})
return err
}
func (g *MSClient) AlterPartition(ctx context.Context, dbName string, tableName string, partition *metastore.Partition) error {
// No batch alter partitions we will need to do it one by one
gluePartition := PartitionLocalToGlue(partition)
_, err := g.client.UpdatePartitionWithContext(ctx,
&glue.UpdatePartitionInput{
CatalogId: aws.String(g.catalogID),
DatabaseName: aws.String(dbName),
PartitionInput: gluePartition,
PartitionValueList: gluePartition.Values,
TableName: aws.String(tableName),
})
return err
}
func (g *MSClient) AlterPartitions(ctx context.Context, dbName string, tableName string, newPartitions []*metastore.Partition) error {
// No batch alter partitions we will need to do it one by one
for _, partition := range newPartitions {
err := g.AlterPartition(ctx, dbName, tableName, partition)
if err != nil {
return err
}
}
return nil
}