New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add GCS support and tests #1105
Conversation
Hi @mszacillo. Thanks for your PR. I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. ℹ️ Googlers: Go here for more info. |
2 similar comments
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. ℹ️ Googlers: Go here for more info. |
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. ℹ️ Googlers: Go here for more info. |
@googlebot I fixed it. |
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. ℹ️ Googlers: Go here for more info. |
/ok-to-test |
@mszacillo This is great! Can you help rebase the master and sign the Google CLA? |
@yuzisun Yes! I can take care of rebasing the branch - for the CLA I'm just waiting to hear back for approval from legal. Should be able to get that done in the next couple days. |
3bd97ca
to
68022fa
Compare
has this been tested outside of unit tests? i.e. an e2e example of a pod with the appropriate labels pulling down from gcs? Can the results of that test be put in the PR description please |
Good point, I can do some manual verifications and post the results on the PR description. |
cf177b1
to
ddc2ffb
Compare
@@ -65,3 +65,31 @@ type ModelSpec struct { | |||
// +optional | |||
Memory resource.Quantity `json:"memory,omitempty"` | |||
} | |||
|
|||
type TrainedModelYaml struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we do need the yaml version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
cmd/agent/main.go
Outdated
Downloader: s3manager.NewDownloaderWithClient(sessionClient, func(d *s3manager.Downloader) { | ||
}), | ||
downloader.Providers[kfstorage.GCS] = &kfstorage.GCSProvider{ | ||
Client: mockapi.AdaptClient(client), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here should use the real client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The real client is defined by storage.NewClient(ctx)
on line 54. This line instead adapts the client to fit the mock client interface which is used to allow unit testing. At first I had set the client type for the GCS provider as storage.Client, but this was causing type errors when I went and mocked the client for unit tests - so I used a common interface between the mocked version and the real client.
I might be missing a better way to do this, so I'll look into it further.
pkg/agent/downloader.go
Outdated
@@ -77,14 +80,41 @@ func (d *Downloader) download(modelName string, storageUri string) error { | |||
return nil | |||
} | |||
|
|||
func (d* Downloader) GetProvider() (string, error) { | |||
storageUri := "" | |||
matches, _ := filepath.Glob(d.ModelDir + "/*.yaml") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if I understand this, the storage uri should be passed on ModelSpec
and does not need to be parsed from the yaml.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding was that we'd configure the storage provider before any of the models are pulled, however I think I was over complicating things. We can just add each supported provider to the Provider map and then upon an add event, the correct provider will be parsed from the ModelSpec
like you said. I'll go ahead and remove this logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to removing this logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few nits but thanks for writing up a much need storage protocol!
cmd/agent/main.go
Outdated
panic(err) | ||
} | ||
|
||
if strings.Contains(storageUri, string(kfstorage.S3)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the appropriate place to do this. This main method is setting up watcher and initializing all the available providers (by checking os.LookupEnv() to see if the agent is configured properly. in essence, this just means you need to do:
if endpoint, ok := os.LookupEnv(bcscredential.GCSEndpointUrl); ok {
or whatever to setup the gcs client, if needed. The logic of whether or not the download should happen from gcs or s3 is configured elsewhere.
pkg/agent/downloader.go
Outdated
@@ -77,14 +80,41 @@ func (d *Downloader) download(modelName string, storageUri string) error { | |||
return nil | |||
} | |||
|
|||
func (d* Downloader) GetProvider() (string, error) { | |||
storageUri := "" | |||
matches, _ := filepath.Glob(d.ModelDir + "/*.yaml") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to removing this logic
pkg/agent/kfstorage/gcs.go
Outdated
) | ||
|
||
type GCSProvider struct { | ||
Client mockapi.Client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mockapi? this should be the physical client
pkg/agent/kfstorage/gcs.go
Outdated
) | ||
} | ||
defer rc.Close() | ||
data, err := ioutil.ReadAll(rc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to send this directly to disk? i.e. zero copy transfer or some more efficient way to write to disk directly? or is there BatchDownloader that takes a FileWriter? and what is rc
here?
@@ -65,3 +65,31 @@ type ModelSpec struct { | |||
// +optional | |||
Memory resource.Quantity `json:"memory,omitempty"` | |||
} | |||
|
|||
type TrainedModelYaml struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
bdaa00d
to
fdfa9d7
Compare
@@ -332,4 +297,39 @@ var _ = Describe("Watcher", func() { | |||
}) | |||
}) | |||
}) | |||
|
|||
Describe("Use GCS Downloader", func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice test! could you add a test for failing to download ?
@mszacillo This looks great! Can you help resolve the conflicts and we should be good to merge! |
@mszacillo I think there must be a bug in the gcs downloader. I tested using
Could you please try
And verify that models are downloaded from gcs? |
@yuzliu I added some debug logs to double check that the file is getting downloaded successfully - seems like the download works, the file gets written to successfully, and that the file exists after the download:
This is however with only the puller pod deployed by itself. Next I'll try creating a trainedmodel. |
d8427f4
to
4ffadbb
Compare
func (g *GCSObjectDownloader) Download(client stiface.Client, it stiface.ObjectIterator) error { | ||
var errs []error | ||
// flag to help determine if query prefix returned an empty iterator | ||
var foundObject = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added in a flag to determine if the returned iterator is empty (meaning that the storage uri prefix used to search for objects returned nothing). If the flag is still false after iteration, that means no objects were found and we should log a warning for the user. This was done since the object iterator does not have a way to check size. That said I'm not entirely happy with this solution - any suggestions?
d42b53e
to
4183643
Compare
/retest |
2 similar comments
/retest |
/retest |
@ifilonenko: The
Use In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/retest |
Adding in testing Removing mockapi and using google-cloud-go-testing instead, removing unnecessary methods, cleaning up code Changing back dockerfile name Rebasing on master Fixing import statement Reverting kfstorage rename to storage, changing gcs import Fixing import statement Changing import in test Combining tests into watcher_test, putting mocks into a testutils package Removing unnecessary suite run, renaming testutils to mocks Adding more test cases, accounting for lack of model name in passed in storageURI Changing iterator retrieval logic Rebasing and cleaning code
4183643
to
085c459
Compare
/retest |
@mszacillo Thanks for your awesome contribution! /lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: mszacillo, yuzisun The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
* Adding in GCS support Adding in testing Removing mockapi and using google-cloud-go-testing instead, removing unnecessary methods, cleaning up code Changing back dockerfile name Rebasing on master Fixing import statement Reverting kfstorage rename to storage, changing gcs import Fixing import statement Changing import in test Combining tests into watcher_test, putting mocks into a testutils package Removing unnecessary suite run, renaming testutils to mocks Adding more test cases, accounting for lack of model name in passed in storageURI Changing iterator retrieval logic Rebasing and cleaning code * Returning a warning if queried object doesn't exist in bucket, resolving test * Rebasing, removing unused import, refactoring * Adding missing parameter to NewWatcher call
What this PR does / why we need it:
This PR adds GCS Protocol support for downloading models.
Fixes #1048
Overview:
I've added GCS as an accepted protocol to provider.go, along with the necessary model download logic using gcs which resides in gcs.go. For unit testing, I needed to find a way to mock the gcs client. The best resource I could find was googleapis/google-cloud-go-testing, but when I tried adding this library as a dependency, there was a conflict that prevented the project from pulling it. Because of this, I added in some interface classes, which I use to adapt the gcs client to a general type, which can be used for mocking. Test coverage for the gcs class is currently at 75%, but this can be increased with a couple more tests.
Note: There were conflicts when importing google.cloud.com/go/storage alongside the storage package within agent. Due to this, I renamed the storage class to kfstorage (but I'm happy to alter this name to something else more fitting if needed). Also noticed that there are a bunch of conflicts with #1055 - so I can resolve those after that PR merges. :)
Testing
For testing, I deployed the puller pod manually and checked the logs to make sure that the models were being successfully downloaded from my GCS bucket. I did this using the following yaml:
After the puller pod comes up, I can see the following logs print which show that the download was successful, however, I will also run a full e2e test with the kfserving setup. Below you can see a failure when trying to connect to localhost:8080, this is because I wasn't proxyporting the ingress gateway.
Release note: