Skip to content

Commit

Permalink
Add GCS support and tests (#1105)
Browse files Browse the repository at this point in the history
* Adding in GCS support

Adding in testing

Removing mockapi and using google-cloud-go-testing instead, removing unnecessary methods, cleaning up code

Changing back dockerfile name

Rebasing on master

Fixing import statement

Reverting kfstorage rename to storage, changing gcs import

Fixing import statement

Changing import in test

Combining tests into watcher_test, putting mocks into a testutils package

Removing unnecessary suite run, renaming testutils to mocks

Adding more test cases, accounting for lack of model name in passed in storageURI

Changing iterator retrieval logic

Rebasing and cleaning code

* Returning a warning if queried object doesn't exist in bucket, resolving test

* Rebasing, removing unused import, refactoring

* Adding missing parameter to NewWatcher call
  • Loading branch information
mszacillo committed Jan 15, 2021
1 parent 003f051 commit b8b3584
Show file tree
Hide file tree
Showing 9 changed files with 529 additions and 54 deletions.
18 changes: 17 additions & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package main

import (
gstorage "cloud.google.com/go/storage"
"context"
"flag"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/googleapis/google-cloud-go-testing/storage/stiface"
"github.com/kelseyhightower/envconfig"
"github.com/kubeflow/kfserving/pkg/agent"
"github.com/kubeflow/kfserving/pkg/agent/storage"
"github.com/kubeflow/kfserving/pkg/apis/serving/v1beta1"
"github.com/kubeflow/kfserving/pkg/batcher"
gcscredential "github.com/kubeflow/kfserving/pkg/credentials/gcs"
s3credential "github.com/kubeflow/kfserving/pkg/credentials/s3"
kfslogger "github.com/kubeflow/kfserving/pkg/logger"
"github.com/pkg/errors"
Expand Down Expand Up @@ -297,10 +300,23 @@ func startModelPuller(logger *zap.SugaredLogger) {
}
}

if _, ok := os.LookupEnv(gcscredential.GCSCredentialEnvKey); ok {
// GCS relies on environment variable GOOGLE_APPLICATION_CREDENTIALS to point to the service-account-key
// If set, it will be automatically be picked up by the client.
logger.Info("Initializing gcs client, using existing GOOGLE_APPLICATION_CREDENTIALS variable.")
ctx := context.Background()
client, err := gstorage.NewClient(ctx)
if err != nil {
panic(err)
}
downloader.Providers[storage.GCS] = &storage.GCSProvider{
Client: stiface.AdaptClient(client),
}
}

watcher := agent.NewWatcher(*configDir, *modelDir, logger)
logger.Info("Starting puller")
agent.StartPuller(downloader, watcher.ModelEvents, logger)
logger.Info("Starting watcher")
watcher.Start()
}

Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kubeflow/kfserving
go 1.13

require (
cloud.google.com/go/storage v1.11.0
github.com/astaxie/beego v1.12.1
github.com/aws/aws-sdk-go v1.31.12
github.com/cloudevents/sdk-go v1.2.0
Expand All @@ -15,7 +16,9 @@ require (
github.com/golang/protobuf v1.4.2
github.com/google/go-cmp v0.5.2
github.com/google/uuid v1.1.1
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8
github.com/json-iterator/go v1.1.10
github.com/mattbaird/jsonpatch v0.0.0-20171005235357-81af80346b1a // indirect
github.com/kelseyhightower/envconfig v1.4.0
github.com/onsi/ginkgo v1.14.0
github.com/onsi/gomega v1.10.2
Expand All @@ -28,15 +31,14 @@ require (
go.uber.org/zap v1.15.0
golang.org/dl v0.0.0-20201105230244-7f2637f4aae3 // indirect
golang.org/x/net v0.0.0-20200904194848-62affa334b73
google.golang.org/api v0.31.0
google.golang.org/grpc v1.31.1
google.golang.org/protobuf v1.25.0
istio.io/api v0.0.0-20200715212100-dbf5277541ef
istio.io/client-go v0.0.0-20201005161859-d8818315d678
istio.io/gogo-genproto v0.0.0-20191029161641-f7d19ec0141d // indirect
k8s.io/api v0.18.8
k8s.io/apiextensions-apiserver v0.18.8
k8s.io/apimachinery v0.18.8
k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible
k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20200410145947-bcb3869e6f29
knative.dev/networking v0.0.0-20200922180040-a71b40c69b15
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZ
cloud.google.com/go v0.60.0/go.mod h1:yw2G51M9IfRboUH61Us8GqCeF1PzPblB823Mn2q2eAU=
cloud.google.com/go v0.61.0/go.mod h1:XukKJg4Y7QsUu0Hxg3qQKUWR4VuWivmyMK2+rUyxAqw=
cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc=
cloud.google.com/go v0.64.0/go.mod h1:xfORb36jGvE+6EexW71nMEtL025s3x6xvuYUKM4JLv4=
cloud.google.com/go v0.65.0 h1:Dg9iHVQfrhq82rUNu9ZxUDrJLaxFUe/HlCVaLyRruq8=
cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
Expand All @@ -49,6 +50,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.9.0/go.mod h1:m+/etGaqZbylxaNT876QGXqEHp4PR2Rq5GMqICWb9bU=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.11.0 h1:bSLyzhbGjLMYxCratCDRSSH7+xRGpNApTBmowDUFGLk=
cloud.google.com/go/storage v1.11.0/go.mod h1:/PAbprKS+5msVYogBmczjWalDXnQ9mr64yEq9YnyPeo=
code.gitea.io/sdk/gitea v0.12.0/go.mod h1:z3uwDV/b9Ls47NGukYM9XhnHtqPh/J+t40lsUrR6JDY=
contrib.go.opencensus.io/exporter/aws v0.0.0-20181029163544-2befc13012d0/go.mod h1:uu1P0UCM/6RbsMrgPa98ll8ZcHM858i/AD06a9aLRCA=
contrib.go.opencensus.io/exporter/ocagent v0.4.12/go.mod h1:450APlNTSR6FrvC3CTRqYosuDstRB9un7SOx2k/9ckA=
Expand Down Expand Up @@ -690,6 +693,8 @@ github.com/googleapis/gnostic v0.3.1 h1:WeAefnSUHlBb0iJKwxFDZdbfGwkd7xRNuV+IpXMJ
github.com/googleapis/gnostic v0.3.1/go.mod h1:on+2t9HRStVgn95RSsFWFz+6Q0Snyqv1awfrALZdbtU=
github.com/googleapis/gnostic v0.4.0 h1:BXDUo8p/DaxC+4FJY/SSx3gvnx9C1VdHNgaUkiEL5mk=
github.com/googleapis/gnostic v0.4.0/go.mod h1:on+2t9HRStVgn95RSsFWFz+6Q0Snyqv1awfrALZdbtU=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8 h1:tlyzajkF3030q6M8SvmJSemC9DTHL/xaMa18b65+JM4=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/gookit/color v1.2.4/go.mod h1:AhIE+pS6D4Ql0SQWbBeXPHw7gY0/sjHoA4s/n1KB7xg=
github.com/gophercloud/gophercloud v0.0.0-20190126172459-c818fa66e4c8/go.mod h1:3WdhXV3rUYy9p6AUW8d94kr+HS62Y4VL9mBnFxsD8q4=
github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
Expand Down Expand Up @@ -1475,6 +1480,7 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA=
Expand Down Expand Up @@ -1699,7 +1705,9 @@ golang.org/x/tools v0.0.0-20200725200936-102e7d357031/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200731060945-b5fad4ed8dd6/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200817023811-d00afeaade8f/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200827163409-021d7c6f1ec3/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200828161849-5deb26317202/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200916195026-c9a70fc28ce3 h1:DywqrEscRX7O2phNjkT0L6lhHKGBoMLCNX+XcAe7t6s=
golang.org/x/tools v0.0.0-20200916195026-c9a70fc28ce3/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
Expand Down Expand Up @@ -1816,7 +1824,9 @@ google.golang.org/genproto v0.0.0-20200726014623-da3ae01ef02d/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200731012542-8145dea6a485/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200815001618-f69a88009b70/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200827165113-ac2560b5e952/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200831141814-d751682dd103/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d h1:92D1fum1bJLKSdr11OJ+54YeCMCGYIygTA7R/YZxH5M=
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Downloader struct {
Logger *zap.SugaredLogger
}

var SupportedProtocols = []storage.Protocol{storage.S3}
var SupportedProtocols = []storage.Protocol{storage.S3, storage.GCS}

func (d *Downloader) DownloadModel(modelName string, modelSpec *v1alpha1.ModelSpec) error {
if modelSpec != nil {
Expand Down
144 changes: 144 additions & 0 deletions pkg/agent/mocks/gcsmock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package mocks

import (
"bytes"
gstorage "cloud.google.com/go/storage"
"context"
"fmt"
"github.com/googleapis/google-cloud-go-testing/storage/stiface"
"google.golang.org/api/iterator"
"strings"
)

type mockGCSClient struct {
stiface.Client
buckets map[string]*mockBucket
}

type mockBucket struct {
attrs *gstorage.BucketAttrs
objects map[string]*gstorage.ObjectAttrs
}

func NewMockClient() stiface.Client {
return &mockGCSClient{buckets: map[string]*mockBucket{}}
}

func (c *mockGCSClient) Bucket(name string) stiface.BucketHandle {
return mockBucketHandle{c: c, name: name}
}

type mockBucketHandle struct {
stiface.BucketHandle
c *mockGCSClient
name string
}

func (b mockBucketHandle) Create(_ context.Context, _ string, attrs *gstorage.BucketAttrs) error {
if _, ok := b.c.buckets[b.name]; ok {
return fmt.Errorf("bucket %q already exists", b.name)
}
if attrs == nil {
attrs = &gstorage.BucketAttrs{}
}
attrs.Name = b.name
b.c.buckets[b.name] = &mockBucket{attrs: attrs, objects: map[string]*gstorage.ObjectAttrs{}}
return nil
}

func (b mockBucketHandle) Objects(ctx context.Context, query *gstorage.Query) stiface.ObjectIterator {
var items []*gstorage.ObjectAttrs
objs := b.c.buckets[b.name].objects
for key, element := range objs {
if strings.Contains(key, query.Prefix){
items = append(items, element)
}
}
return &mockObjectIterator{b: b, items: items}
}

type mockObjectIterator struct {
stiface.ObjectIterator
b mockBucketHandle
items []*gstorage.ObjectAttrs
}

func (i *mockObjectIterator) Next() (*gstorage.ObjectAttrs, error) {
if len(i.items) == 0 {
return nil, iterator.Done
}
item := i.items[0]
i.items = i.items[1:]
return item, nil
}

func (b mockBucketHandle) Object(name string) stiface.ObjectHandle {
return mockObjectHandle{c: b.c, bucketName: b.name, name: name}
}

type mockObjectHandle struct {
stiface.ObjectHandle
c *mockGCSClient
bucketName string
name string
}

func (o mockObjectHandle) Attrs(context.Context) (*gstorage.ObjectAttrs, error) {
bkt, ok := o.c.buckets[o.bucketName]
if !ok {
return nil, fmt.Errorf("bucket %q not found", o.bucketName)
}
contents, ok := bkt.objects[o.name]
if !ok {
return nil, gstorage.ErrObjectNotExist
}
return contents, nil
}

func (o mockObjectHandle) NewReader(context.Context) (stiface.Reader, error) {
bkt, ok := o.c.buckets[o.bucketName]
if !ok {
return nil, fmt.Errorf("bucket %q not found", o.bucketName)
}
contents, ok := bkt.objects[o.name]
if !ok {
return nil, fmt.Errorf("object %q not found in bucket %q", o.name, o.bucketName)
}
return mockReader{r: bytes.NewReader(contents.MD5)}, nil
}

func (o mockObjectHandle) NewWriter(context.Context) stiface.Writer {
attrs := &gstorage.ObjectAttrs{
Bucket: o.bucketName,
Name: o.name,
MD5: nil,
}
o.c.buckets[o.bucketName].objects[o.name] = attrs
return &mockWriter{o: o, obj: attrs}
}

type mockReader struct {
stiface.Reader
r *bytes.Reader
}

func (r mockReader) Read(buf []byte) (int, error) {
return r.r.Read(buf)
}

func (r mockReader) Close() error {
return nil
}

type mockWriter struct {
stiface.Writer
o mockObjectHandle
buf bytes.Buffer
obj *gstorage.ObjectAttrs
}

func (w *mockWriter) Write(data []byte) (int, error) {
int, err := w.buf.Write(data)
w.obj.MD5 = data
return int, err
}
45 changes: 45 additions & 0 deletions pkg/agent/mocks/s3mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package mocks

import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/golang/protobuf/proto"
)

type MockS3Client struct {
s3iface.S3API
}

func (m *MockS3Client) ListObjects(*s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
return &s3.ListObjectsOutput{
Contents: []*s3.Object{
{
Key: proto.String("model.pt"),
},
},
}, nil
}

type MockS3Downloader struct {
}

func (m *MockS3Downloader) DownloadWithIterator(aws.Context, s3manager.BatchDownloadIterator, ...func(*s3manager.Downloader)) error {
return nil
}

type MockS3FailDownloader struct {
Err error
}

func (m *MockS3FailDownloader) DownloadWithIterator(aws.Context, s3manager.BatchDownloadIterator, ...func(*s3manager.Downloader)) error {
var errs []s3manager.Error
errs = append(errs, s3manager.Error{
OrigErr: fmt.Errorf("failed to download"),
Bucket: aws.String("modelRepo"),
Key: aws.String("model1/model.pt"),
})
return s3manager.NewBatchError("BatchedDownloadIncomplete", "some objects have failed to download.", errs)
}

0 comments on commit b8b3584

Please sign in to comment.