/
meta.go
151 lines (123 loc) · 3.87 KB
/
meta.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package repository
import (
"fmt"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/guregu/dynamo"
"github.com/m-mizutani/minerva/pkg/models"
"github.com/pkg/errors"
)
// MetaRepository is interface of object repository
type MetaRepository interface {
GetObjecID(s3path string) (int64, error)
PutRecordObjects(objects []*MetaRecordObject) error
GetRecordObjects(recordIDs []string, schema models.ParquetSchemaName) ([]*MetaRecordObject, error)
HeadPartition(partitionKey string) (bool, error)
PutPartition(partitionKey string) error
}
// MetaDynamoDB is implementation of MetaRepository
type MetaDynamoDB struct {
table dynamo.Table
}
type metaBase struct {
ExpiresAt int64 `dynamo:"expires_at"`
PKey string `dynamo:"pk"`
SKey string `dynamo:"sk"`
}
type metaObjectCount struct {
metaBase
ID int64 `dynamo:"id"`
}
type MetaRecordObject struct {
metaBase
models.S3Object
RecordID string `dynamo:"record_id"`
Schema models.ParquetSchemaName `dynamo:"schema"`
}
func (x *MetaRecordObject) HashKey() interface{} {
return fmt.Sprintf("record/%s", x.RecordID)
}
func (x *MetaRecordObject) RangeKey() interface{} {
return string(x.Schema)
}
// NewMetaDynamoDB is a constructor of MetaDynamoDB as MetaAccessor
func NewMetaDynamoDB(region, tableName string) MetaRepository {
db := dynamo.New(session.New(), &aws.Config{Region: aws.String(region)})
table := db.Table(tableName)
meta := MetaDynamoDB{
table: table,
}
return &meta
}
func (x *MetaDynamoDB) GetObjecID(s3path string) (int64, error) {
var result metaObjectCount
var inc int64 = 1
query := x.table.
Update("pk", "meta:indexer").
Range("sk", "counter").
Add("id", inc)
if err := query.Value(&result); err != nil {
return 0, errors.Wrap(err, "Fail to update Object ID in DynamoDB")
}
return result.ID, nil
}
// PutRecordObjects puts set of S3 path of record file to DynamoDB
func (x *MetaDynamoDB) PutRecordObjects(records []*MetaRecordObject) error {
now := time.Now().UTC()
var items []interface{}
for _, item := range records {
item.PKey = item.HashKey().(string)
item.SKey = item.RangeKey().(string)
item.ExpiresAt = now.Add(time.Hour * 24 * 30).Unix()
items = append(items, item)
}
query := x.table.Batch("pk", "sk").Write().Put(items...)
if n, err := query.Run(); err != nil {
return errors.Wrap(err, "Failed to put S3 object path")
} else if n != len(items) {
return errors.Wrap(err, "Failed to write all path set")
}
return nil
}
// GetRecordObjects retrieves S3 path of record file from DynamoDB
func (x *MetaDynamoDB) GetRecordObjects(recordIDs []string, schema models.ParquetSchemaName) ([]*MetaRecordObject, error) {
var results []*MetaRecordObject
var keys []dynamo.Keyed
for _, id := range recordIDs {
keys = append(keys, &MetaRecordObject{RecordID: id, Schema: schema})
}
if err := x.table.Batch("pk", "sk").Get(keys...).All(&results); err != nil {
if err == dynamo.ErrNotFound {
return nil, err
}
return nil, errors.Wrap(err, "Failed to batch get S3 object path")
}
return results, nil
}
func toPartitionKey(partition string) string {
return "partition:" + partition
}
func (x *MetaDynamoDB) HeadPartition(partitionKey string) (bool, error) {
var result metaBase
pkey := toPartitionKey(partitionKey)
if err := x.table.Get("pk", pkey).Range("sk", dynamo.Equal, "@").One(&result); err != nil {
if err == dynamo.ErrNotFound {
return false, nil
}
return false, errors.Wrapf(err, "Fail to get partition key: %s", pkey)
}
return true, nil
}
func (x *MetaDynamoDB) PutPartition(partitionKey string) error {
now := time.Now().UTC()
pindex := metaBase{
ExpiresAt: now.Add(time.Hour * 24 * 365).Unix(),
PKey: toPartitionKey(partitionKey),
SKey: "@",
}
if err := x.table.Put(pindex).Run(); err != nil {
return errors.Wrapf(err, "Fail to put parition key: %v", pindex)
}
return nil
}