-
Notifications
You must be signed in to change notification settings - Fork 111
/
api.go
117 lines (102 loc) · 2.88 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package gcs
import (
"context"
"errors"
"cloud.google.com/go/storage"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/pkg/gcputil"
"gocloud.dev/blob"
"gocloud.dev/blob/gcsblob"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/types/known/timestamppb"
)
func (c *Connection) ListBuckets(ctx context.Context, req *runtimev1.GCSListBucketsRequest) ([]string, string, error) {
credentials, err := gcputil.Credentials(ctx, c.config.SecretJSON, c.config.AllowHostAccess)
if err != nil {
return nil, "", err
}
client, err := storage.NewClient(ctx, option.WithCredentials(credentials))
if err != nil {
return nil, "", err
}
defer client.Close()
projectID, err := gcputil.ProjectID(credentials)
if err != nil {
return nil, "", err
}
pageSize := int(req.PageSize)
if pageSize == 0 {
pageSize = defaultPageSize
}
pager := iterator.NewPager(client.Buckets(ctx, projectID), pageSize, req.PageToken)
buckets := make([]*storage.BucketAttrs, 0)
next, err := pager.NextPage(&buckets)
if err != nil {
return nil, "", err
}
names := make([]string, len(buckets))
for i := 0; i < len(buckets); i++ {
names[i] = buckets[i].Name
}
return names, next, nil
}
func (c *Connection) ListObjects(ctx context.Context, req *runtimev1.GCSListObjectsRequest) ([]*runtimev1.GCSObject, string, error) {
client, err := c.createClient(ctx)
if err != nil {
return nil, "", err
}
bucket, err := gcsblob.OpenBucket(ctx, client, req.Bucket, nil)
if err != nil {
return nil, "", err
}
defer bucket.Close()
pageSize := int(req.PageSize)
if pageSize == 0 {
pageSize = defaultPageSize
}
var pageToken []byte
if req.PageToken == "" {
pageToken = blob.FirstPageToken
} else {
pageToken = []byte(req.PageToken)
}
objects, nextToken, err := bucket.ListPage(ctx, pageToken, pageSize, &blob.ListOptions{
Prefix: req.Prefix,
Delimiter: req.Delimiter,
BeforeList: func(as func(interface{}) bool) error {
var q *storage.Query
if as(&q) {
q.StartOffset = req.StartOffset
q.EndOffset = req.EndOffset
} else {
panic("Listobjects failed")
}
return nil
},
})
if err != nil {
return nil, "", err
}
gcsObjects := make([]*runtimev1.GCSObject, len(objects))
for i, object := range objects {
gcsObjects[i] = &runtimev1.GCSObject{
Name: object.Key,
ModifiedOn: timestamppb.New(object.ModTime),
Size: object.Size,
IsDir: object.IsDir,
}
}
return gcsObjects, string(nextToken), nil
}
func (c *Connection) GetCredentialsInfo(ctx context.Context) (string, bool, error) {
creds, err := gcputil.Credentials(ctx, c.config.SecretJSON, c.config.AllowHostAccess)
if err != nil {
if errors.Is(err, gcputil.ErrNoCredentials) {
return "", false, nil
}
return "", false, err
}
projectID, err := gcputil.ProjectID(creds)
return projectID, err == nil, err
}