-
Notifications
You must be signed in to change notification settings - Fork 351
/
metastore.go
188 lines (170 loc) · 5.4 KB
/
metastore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package metastore
import (
"strings"
"time"
"github.com/aws/aws-sdk-go/service/glue"
"github.com/davecgh/go-spew/spew"
"github.com/treeverse/lakefs/pkg/logging"
)
const (
// sparkSQLWorkaroundSuffix is a suffix added as a hack in Spark SQL, locations with this suffix are not used and should not be changed, Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for more details.
sparkSQLWorkaroundSuffix = "-__PLACEHOLDER__"
// sparkSQLTableProviderKey specifies the table is a Spark SQL data source table
sparkSQLTableProviderKey = "spark.sql.sources.provider"
sparkSQLProviderLocationKey = "path"
)
func (m *Table) Update(db, table, serde string, transformLocation func(location string) (string, error), isSparkSQLTable, fixSparkPlaceHolder bool) error {
log := logging.Default().WithFields(logging.Fields{
"db": db,
"table": table,
"serde": serde,
})
if m.Sd == nil {
m.Sd = &StorageDescriptor{}
}
m.DBName = db
m.TableName = table
err := m.Sd.Update(db, table, serde, transformLocation, isSparkSQLTable, fixSparkPlaceHolder)
if err != nil {
log.WithError(err).WithField("table", spew.Sdump(*m)).Error("Update table")
return err
}
log.WithField("table", spew.Sdump(*m)).Debug("Update table")
return nil
}
func (m *Table) isSparkSQLTable() (res bool) {
_, res = m.Parameters[sparkSQLTableProviderKey]
return
}
func (m *Partition) Update(db, table, serde string, transformLocation func(location string) (string, error), isSparkSQLTable, fixSparkPlaceHolder bool) error {
log := logging.Default().WithFields(logging.Fields{
"db": db,
"table": table,
"serde": serde,
})
if m.Sd == nil {
m.Sd = &StorageDescriptor{}
}
if m.Sd.SerdeInfo == nil {
m.Sd.SerdeInfo = &SerDeInfo{}
}
m.DBName = db
m.TableName = table
m.Sd.SerdeInfo.Name = serde
err := m.Sd.Update(db, table, serde, transformLocation, isSparkSQLTable, fixSparkPlaceHolder)
if err != nil {
log.WithError(err).WithField("table", spew.Sdump(*m)).Error("Update table")
return err
}
log.WithField("table", spew.Sdump(*m)).Debug("Update table")
return nil
}
func (m *StorageDescriptor) Update(db, table, serde string, transformLocation func(location string) (string, error), isSparkSQLTable, fixSparkPlaceHolder bool) error {
if m.SerdeInfo == nil {
m.SerdeInfo = &SerDeInfo{}
}
m.SerdeInfo.Name = serde
var err error
if m.Location != "" && !(isSparkSQLTable && strings.HasSuffix(m.Location, sparkSQLWorkaroundSuffix)) {
m.Location, err = transformLocation(m.Location)
}
if err != nil {
return err
}
if isSparkSQLTable {
// Table was created by Spark SQL, we should change the internal stored Spark SQL location
if l, ok := m.SerdeInfo.Parameters[sparkSQLProviderLocationKey]; ok {
updatedLocation, err := transformLocation(l)
if err != nil {
return err
}
m.SerdeInfo.Parameters[sparkSQLProviderLocationKey] = updatedLocation
if fixSparkPlaceHolder && strings.HasSuffix(m.Location, sparkSQLWorkaroundSuffix) {
m.Location = updatedLocation
}
}
}
return err
}
type Database struct {
Name string
Description string
LocationURI string
Parameters map[string]string
HivePrivileges interface{}
OwnerName *string
HiveOwnerType interface{}
AWSTargetDatabase *glue.DatabaseIdentifier
}
type Table struct {
TableName string
DBName string
Owner string
CreateTime int64
LastAccessTime int64
Retention int
Sd *StorageDescriptor
PartitionKeys []*FieldSchema
Parameters map[string]string
ViewOriginalText string
ViewExpandedText string
TableType string
Temporary bool
RewriteEnabled *bool
AWSCreatedBy *string
AWSDescription *string
AWSIsRegisteredWithLakeFormation *bool
AWSLastAnalyzedTime *time.Time
AWSTargetTable interface{}
AWSUpdateTime *time.Time
Privileges interface{}
}
type Partition struct {
Values []string
DBName string
TableName string
CreateTime int
LastAccessTime int
Sd *StorageDescriptor
Parameters map[string]string
AWSLastAnalyzedTime *time.Time
Privileges interface{}
}
type StorageDescriptor struct {
Cols []*FieldSchema
Location string
InputFormat string
OutputFormat string
Compressed bool
NumBuckets int
SerdeInfo *SerDeInfo
BucketCols []string
SortCols []*Order
Parameters map[string]string
SkewedInfo *SkewedInfo
StoredAsSubDirectories *bool
AWSSchemaReference interface{}
}
type SerDeInfo struct {
Name string
SerializationLib string
Parameters map[string]string
}
type FieldSchema struct {
Name string
Type string
Comment string
}
type Order struct {
Col string
Order int
}
type SkewedInfo struct {
SkewedColNames []string
SkewedColValues [][]string
AWSSkewedColValues []string //
SkewedColValueLocationMaps map[string]string
}
func (m Partition) Name() string {
return strings.Join(m.Values, "-")
}