diff --git a/cmd/agent/main.go b/cmd/agent/main.go index d8e18a4867..4120ec2b2c 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -1,22 +1,14 @@ package main import ( - gstorage "cloud.google.com/go/storage" "context" "flag" "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3manager" - "github.com/googleapis/google-cloud-go-testing/storage/stiface" "github.com/kelseyhightower/envconfig" "github.com/kubeflow/kfserving/pkg/agent" "github.com/kubeflow/kfserving/pkg/agent/storage" "github.com/kubeflow/kfserving/pkg/apis/serving/v1beta1" "github.com/kubeflow/kfserving/pkg/batcher" - gcscredential "github.com/kubeflow/kfserving/pkg/credentials/gcs" - s3credential "github.com/kubeflow/kfserving/pkg/credentials/s3" kfslogger "github.com/kubeflow/kfserving/pkg/logger" "github.com/pkg/errors" "go.uber.org/zap" @@ -34,7 +26,6 @@ import ( "net/url" "os" "strconv" - "strings" "time" ) @@ -276,44 +267,6 @@ func startModelPuller(logger *zap.SugaredLogger) { Logger: logger, } - if endpoint, ok := os.LookupEnv(s3credential.AWSEndpointUrl); ok { - region, _ := os.LookupEnv(s3credential.AWSRegion) - useVirtualBucketString, ok := os.LookupEnv(s3credential.S3UseVirtualBucket) - useVirtualBucket := true - if ok && strings.ToLower(useVirtualBucketString) == "false" { - useVirtualBucket = false - } - sess, err := session.NewSession(&aws.Config{ - Endpoint: aws.String(endpoint), - Region: aws.String(region), - S3ForcePathStyle: aws.Bool(!useVirtualBucket)}, - ) - logger.Infof("Initializing s3 client with endpoint %s, region %s", endpoint, region) - if err != nil { - panic(err) - } - sessionClient := s3.New(sess) - downloader.Providers[storage.S3] = &storage.S3Provider{ - Client: sessionClient, - Downloader: s3manager.NewDownloaderWithClient(sessionClient, func(d *s3manager.Downloader) { - }), - } - } - - if _, ok := os.LookupEnv(gcscredential.GCSCredentialEnvKey); ok { - // GCS relies on environment variable GOOGLE_APPLICATION_CREDENTIALS to point to the service-account-key - // If set, it will be automatically be picked up by the client. - logger.Info("Initializing gcs client, using existing GOOGLE_APPLICATION_CREDENTIALS variable.") - ctx := context.Background() - client, err := gstorage.NewClient(ctx) - if err != nil { - panic(err) - } - downloader.Providers[storage.GCS] = &storage.GCSProvider{ - Client: stiface.AdaptClient(client), - } - } - watcher := agent.NewWatcher(*configDir, *modelDir, logger) logger.Info("Starting puller") agent.StartPuller(downloader, watcher.ModelEvents, logger) diff --git a/docs/MULTIMODELSERVING_GUIDE.md b/docs/MULTIMODELSERVING_GUIDE.md new file mode 100644 index 0000000000..0f39de031a --- /dev/null +++ b/docs/MULTIMODELSERVING_GUIDE.md @@ -0,0 +1,28 @@ +# Multi-Model Serving +## Introduction + +### Problem + +With machine learning approaches becoming more widely adopted in organizations, there is a trend to deploy many models. More models aims to provide personalized experience which often need to train a lot of models. Additionally, many models help to isolate each user’s data and train models separately for data privacy. +When KFServing was originally designed, it followed the one model and one server paradigm which presents a challenge for the Kubernetes cluster when users want to deploy many models. +For example, Kubernetes sets a default limit of 110 pods per node. A 100 nodes cluster can host at most 11,000 pods, which is often not enough. +Additionally, there is no easy way to request a fraction of GPU in Kubernetes infrastructure, it makes sense to load multiple models in one model server to share GPU resources. KFServing's multi-model serving is a solution that allows for loading multiple models into a server while still keeping the out of the box serverless features. + +### Benefits +- Allow multiple models to share the same GPU +- Increase the total number of models that can be deployed in a cluster +- Reduced model deployment resource overhead + - An InferenceService needs some CPU and overhead for each replica + - Loading multiple models in one inferenceService is more resource efficient + - Allow deploying hundreds of thousands of models with ease and monitoring deployed trained models at scale + +### Design +![Multi-model Diagram](./diagrams/mms-design.png) + +### Integration with model servers +Multi-model serving will work with any model server that implements KFServing V2 protocol. More specifically, if the model server implements the load and unload endpoint then it can use KFServing's TrainedModel. +Currently, the only supported model servers are Triton, SKLearn, and XGBoost. Click on [Triton](https://github.com/kubeflow/kfserving/tree/master/docs/samples/v1beta1/triton/multimodel) or [SKLearn](https://github.com/kubeflow/kfserving/tree/master/docs/samples/v1beta1/sklearn/multimodel) to see examples on how to run multi-model serving! + + + +For a more in depth details checkout this [document](https://docs.google.com/document/d/11qETyR--oOIquQke-DCaLsZY75vT1hRu21PesSUDy7o). \ No newline at end of file diff --git a/docs/diagrams/mms-design.png b/docs/diagrams/mms-design.png new file mode 100644 index 0000000000..55d2195018 Binary files /dev/null and b/docs/diagrams/mms-design.png differ diff --git a/docs/samples/v1beta1/sklearn/multimodel/README.md b/docs/samples/v1beta1/sklearn/multimodel/README.md new file mode 100644 index 0000000000..619ff82836 --- /dev/null +++ b/docs/samples/v1beta1/sklearn/multimodel/README.md @@ -0,0 +1,159 @@ +#Multi-Model Serving with Sklearn + +## Overview + +The general overview of multi-model serving: +1. Deploy InferenceService with the framework specified +2. Deploy TrainedModel(s) with the storageUri, framework, and memory +3. A config map will be created and will contain details about each trained model +4. Model Agent loads model from the model config +5. An endpoint is set up and is ready to serve model(s) +6. Deleting a model leads to removing model from config map which causes the model agent to unload the model +7. Deleting the InferenceService causes the TrainedModel(s) to be deleted + + +## Example +Firstly, you should have kfserving installed. Check [this](https://github.com/kubeflow/kfserving#install-kfserving) out if you have not installed kfserving. + +The content below is in the file `inferenceservice.yaml`. + +```yaml +apiVersion: "serving.kubeflow.org/v1beta1" +kind: "InferenceService" +metadata: + name: "sklearn-iris-example" +spec: + predictor: + minReplicas: 1 + sklearn: + protocolVersion: v1 + name: "sklearn-iris-predictor" + resources: + limits: + cpu: 100m + memory: 256Mi + requests: + cpu: 100m + memory: 256Mi +``` +Run the command `kubectl apply -f inferenceservice.yaml` to create the inference service. Check if the service is properly deployed by running `kubectl get inferenceservice`. The output should be similar to the below. +```yaml +NAME URL READY PREV LATEST PREVROLLEDOUTREVISION LATESTREADYREVISION AGE +sklearn-iris-example http://sklearn-iris-example.default.example.com True 100 sklearn-iris-example-predictor-default-kgtql 22s +``` + +Next, the other file the trained models `trainedmodels.yaml` is shown below. +```yaml +apiVersion: "serving.kubeflow.org/v1alpha1" +kind: "TrainedModel" +metadata: + name: "model1-sklearn" +spec: + inferenceService: "sklearn-iris-example" + model: + storageUri: "gs://kfserving-samples/models/sklearn/iris" + framework: "sklearn" + memory: "256Mi" +--- +apiVersion: "serving.kubeflow.org/v1alpha1" +kind: "TrainedModel" +metadata: + name: "model2-sklearn" +spec: + inferenceService: "sklearn-iris-example" + model: + storageUri: "gs://kfserving-samples/models/sklearn/iris" + framework: "sklearn" + memory: "256Mi" +``` +Run the command `kubectl apply -f trainedmodels.yaml` to create the trained models. Run `kubectl get trainedmodel` to view the resource. + +Run `kubectl get po` to get the name of the predictor pod. The name should be similar to sklearn-iris-example-predictor-default-xxxxx-deployment-xxxxx. + +Run `kubectl logs -c agent` to check if the models are properly loaded. You should get the same output as below. Wait a few minutes and try again if you do not see "Downloading model". +```yaml +{"level":"info","ts":"2021-01-20T16:24:00.421Z","caller":"agent/puller.go:129","msg":"Downloading model from gs://kfserving-samples/models/sklearn/iris"} +{"level":"info","ts":"2021-01-20T16:24:00.421Z","caller":"agent/downloader.go:47","msg":"Downloading gs://kfserving-samples/models/sklearn/iris to model dir /mnt/models"} +{"level":"info","ts":"2021-01-20T16:24:00.424Z","caller":"agent/puller.go:121","msg":"Worker is started for model1-sklearn"} +{"level":"info","ts":"2021-01-20T16:24:00.424Z","caller":"agent/puller.go:129","msg":"Downloading model from gs://kfserving-samples/models/sklearn/iris"} +{"level":"info","ts":"2021-01-20T16:24:00.424Z","caller":"agent/downloader.go:47","msg":"Downloading gs://kfserving-samples/models/sklearn/iris to model dir /mnt/models"} +{"level":"info","ts":"2021-01-20T16:24:09.255Z","caller":"agent/puller.go:146","msg":"Successfully loaded model model2-sklearn"} +{"level":"info","ts":"2021-01-20T16:24:09.256Z","caller":"agent/puller.go:114","msg":"completion event for model model2-sklearn, in flight ops 0"} +{"level":"info","ts":"2021-01-20T16:24:09.260Z","caller":"agent/puller.go:146","msg":"Successfully loaded model model1-sklearn"} +{"level":"info","ts":"2021-01-20T16:24:09.260Z","caller":"agent/puller.go:114","msg":"completion event for model model1-sklearn, in flight ops 0"} +``` + +Run the command `kubectl get cm modelconfig-sklearn-iris-example-0 -oyaml` to get the configmap. The output should be similar to the below. +```yaml +apiVersion: v1 +data: + models.json: '[{"modelName":"model1-sklearn","modelSpec":{"storageUri":"gs://kfserving-samples/models/sklearn/iris","framework":"sklearn","memory":"256Mi"}},{"modelName":"model2-sklearn","modelSpec":{"storageUri":"gs://kfserving-samples/models/sklearn/iris","framework":"sklearn","memory":"256Mi"}}]' +kind: ConfigMap +metadata: + creationTimestamp: "2021-01-20T16:22:52Z" + name: modelconfig-sklearn-iris-example-0 + namespace: default + ownerReferences: + - apiVersion: serving.kubeflow.org/v1beta1 + blockOwnerDeletion: true + controller: true + kind: InferenceService + name: sklearn-iris-example + uid: f91d8414-0bfa-4182-af25-5d0c1a7eff4e + resourceVersion: "1958556" + selfLink: /api/v1/namespaces/default/configmaps/modelconfig-sklearn-iris-example-0 + uid: 79e68f80-e31a-419b-994b-14a6159d8cc2 +``` + +The models will be ready to serve once they are successfully loaded. + +Check to see which case applies to you. + +If the EXTERNAL-IP value is set, your environment has an external load balancer that you can use for the ingress gateway. Set them by running: +````bash +export INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}') +export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].port}') +export SERVICE_HOSTNAME=$(kubectl get inferenceservice sklearn-iris-example -n default -o jsonpath='{.status.url}' | cut -d "/" -f 3) +```` + +If the EXTERNAL-IP is none, and you can access the gateway using the service's node port: +```bash +# GKE +export INGRESS_HOST=worker-node-address +# Minikube +export INGRESS_HOST=$(minikube ip)å +# Other environment(On Prem) +export INGRESS_HOST=$(kubectl get po -l istio=ingressgateway -n istio-system -o jsonpath='{.items[0].status.hostIP}') +export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}') +``` + +For KIND/Port Fowarding: +- Run `kubectl port-forward -n istio-system svc/istio-ingressgateway 8080:80` +- In a different window, run: + ```bash + export INGRESS_HOST=localhost + export INGRESS_PORT=8080 + export SERVICE_HOSTNAME=$(kubectl get inferenceservice sklearn-iris-example -n default -o jsonpath='{.status.url}' | cut -d "/" -f 3) + ``` + + +After setting up the above: +- Go to the root directory of `kfserving` +- Query the two models: + - Curl from ingress gateway: + ```bash + curl -v -H "Host: ${SERVICE_HOSTNAME}" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/model1-sklearn:predict -d @./docs/samples/v1alpha2/sklearn/iris-input.json + curl -v -H "Host: ${SERVICE_HOSTNAME}" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/model2-sklearn:predict -d @./docs/samples/v1alpha2/sklearn/iris-input.json + ``` + - Curl from local cluster gateway + ``` + curl -v http://sklearn-iris-example.default/v1/models/model1-sklearn:predict -d @./docs/samples/v1alpha2/sklearn/iris-input.json + curl -v http://sklearn-iris-example.default/v1/models/model2-sklearn:predict -d @./docs/samples/v1alpha2/sklearn/iris-input.json + ``` + +The outputs should be +```yaml +{"predictions": [1, 1]}* +``` + +To remove the resources, run the command `kubectl delete inferenceservice sklearn-iris-example`. This will delete the inference service and result in the trained models being deleted. \ No newline at end of file diff --git a/docs/samples/v1beta1/sklearn/multimodel/inferenceservice.yaml b/docs/samples/v1beta1/sklearn/multimodel/inferenceservice.yaml new file mode 100644 index 0000000000..0d9d939507 --- /dev/null +++ b/docs/samples/v1beta1/sklearn/multimodel/inferenceservice.yaml @@ -0,0 +1,17 @@ +apiVersion: "serving.kubeflow.org/v1beta1" +kind: "InferenceService" +metadata: + name: "sklearn-iris-example" +spec: + predictor: + minReplicas: 1 + sklearn: + protocolVersion: v1 + name: "sklearn-iris-predictor" + resources: + limits: + cpu: 100m + memory: 256Mi + requests: + cpu: 100m + memory: 256Mi diff --git a/docs/samples/v1beta1/sklearn/multimodel/trainedmodels.yaml b/docs/samples/v1beta1/sklearn/multimodel/trainedmodels.yaml new file mode 100644 index 0000000000..7c7b2598b4 --- /dev/null +++ b/docs/samples/v1beta1/sklearn/multimodel/trainedmodels.yaml @@ -0,0 +1,21 @@ +apiVersion: "serving.kubeflow.org/v1alpha1" +kind: "TrainedModel" +metadata: + name: "model1-sklearn" +spec: + inferenceService: "sklearn-iris-example" + model: + storageUri: "gs://kfserving-samples/models/sklearn/iris" + framework: "sklearn" + memory: "256Mi" +--- +apiVersion: "serving.kubeflow.org/v1alpha1" +kind: "TrainedModel" +metadata: + name: "model2-sklearn" +spec: + inferenceService: "sklearn-iris-example" + model: + storageUri: "gs://kfserving-samples/models/sklearn/iris" + framework: "sklearn" + memory: "256Mi" \ No newline at end of file diff --git a/go.mod b/go.mod index 9ad10c636e..326217cfe7 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,8 @@ require ( github.com/google/uuid v1.1.1 github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8 github.com/json-iterator/go v1.1.10 - github.com/mattbaird/jsonpatch v0.0.0-20171005235357-81af80346b1a // indirect github.com/kelseyhightower/envconfig v1.4.0 + github.com/mattbaird/jsonpatch v0.0.0-20171005235357-81af80346b1a // indirect github.com/onsi/ginkgo v1.14.0 github.com/onsi/gomega v1.10.2 github.com/pkg/errors v0.9.1 @@ -39,6 +39,7 @@ require ( istio.io/gogo-genproto v0.0.0-20191029161641-f7d19ec0141d // indirect k8s.io/api v0.18.8 k8s.io/apimachinery v0.18.8 + k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible k8s.io/klog v1.0.0 k8s.io/kube-openapi v0.0.0-20200410145947-bcb3869e6f29 knative.dev/networking v0.0.0-20200922180040-a71b40c69b15 diff --git a/pkg/agent/downloader.go b/pkg/agent/downloader.go index ed52e4d3f3..464a00eb42 100644 --- a/pkg/agent/downloader.go +++ b/pkg/agent/downloader.go @@ -67,9 +67,9 @@ func (d *Downloader) download(modelName string, storageUri string) error { if err != nil { return errors.Wrapf(err, "unsupported protocol") } - provider, ok := d.Providers[protocol] - if !ok { - return errors.Wrapf(err, "protocol manager for %s is not initialized", protocol) + provider, err := storage.GetProvider(d.Providers, protocol) + if err != nil { + return errors.Wrapf(err, "unable to create or get provider for protocol %s", protocol) } if err := provider.DownloadModel(d.ModelDir, modelName, storageUri); err != nil { return errors.Wrapf(err, "failed to download model") diff --git a/pkg/agent/storage/gcs.go b/pkg/agent/storage/gcs.go index bb74679c14..df342a093f 100644 --- a/pkg/agent/storage/gcs.go +++ b/pkg/agent/storage/gcs.go @@ -26,13 +26,13 @@ func (p *GCSProvider) DownloadModel(modelDir string, modelName string, storageUr prefix = tokens[1] } ctx := context.Background() - gcsObjectDownloader := &GCSObjectDownloader { - Context: ctx, + gcsObjectDownloader := &GCSObjectDownloader{ + Context: ctx, StorageUri: storageUri, - ModelDir: modelDir, - ModelName: modelName, - Bucket: tokens[0], - Item: prefix, + ModelDir: modelDir, + ModelName: modelName, + Bucket: tokens[0], + Item: prefix, } it, err := gcsObjectDownloader.GetObjectIterator(p.Client) if err != nil { @@ -71,7 +71,8 @@ func (g *GCSObjectDownloader) Download(client stiface.Client, it stiface.ObjectI return fmt.Errorf("an error occurred while iterating: %v", err) } foundObject = true - fileName := filepath.Join(g.ModelDir, g.ModelName, attrs.Name) + objectValue := strings.TrimPrefix(attrs.Name, g.Item) + fileName := filepath.Join(g.ModelDir, g.ModelName, objectValue) if FileExists(fileName) { log.Info("Deleting", fileName) if err := os.Remove(fileName); err != nil { diff --git a/pkg/agent/storage/utils.go b/pkg/agent/storage/utils.go index 06dc0c4fa4..31e1cef0e7 100644 --- a/pkg/agent/storage/utils.go +++ b/pkg/agent/storage/utils.go @@ -17,9 +17,20 @@ limitations under the License. package storage import ( + gstorage "cloud.google.com/go/storage" + "context" "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/googleapis/google-cloud-go-testing/storage/stiface" + gcscredential "github.com/kubeflow/kfserving/pkg/credentials/gcs" + s3credential "github.com/kubeflow/kfserving/pkg/credentials/s3" + "google.golang.org/api/option" "os" "path/filepath" + "strings" ) func FileExists(filename string) bool { @@ -56,3 +67,58 @@ func RemoveDir(dir string) error { } return nil } + +func GetProvider(providers map[Protocol]Provider, protocol Protocol) (Provider, error) { + if provider, ok := providers[protocol]; ok { + return provider, nil + } + + switch protocol { + case GCS: + var gcsClient *gstorage.Client + var err error + + ctx := context.Background() + if _, ok := os.LookupEnv(gcscredential.GCSCredentialEnvKey); ok { + // GCS relies on environment variable GOOGLE_APPLICATION_CREDENTIALS to point to the service-account-key + // If set, it will be automatically be picked up by the client. + gcsClient, err = gstorage.NewClient(ctx) + } else { + gcsClient, err = gstorage.NewClient(ctx, option.WithoutAuthentication()) + } + + if err != nil { + return nil, err + } + + providers[GCS] = &GCSProvider{ + Client: stiface.AdaptClient(gcsClient), + } + case S3: + if endpoint, ok := os.LookupEnv(s3credential.AWSEndpointUrl); ok { + region, _ := os.LookupEnv(s3credential.AWSRegion) + useVirtualBucketString, ok := os.LookupEnv(s3credential.S3UseVirtualBucket) + useVirtualBucket := true + if ok && strings.ToLower(useVirtualBucketString) == "false" { + useVirtualBucket = false + } + sess, err := session.NewSession(&aws.Config{ + Endpoint: aws.String(endpoint), + Region: aws.String(region), + S3ForcePathStyle: aws.Bool(!useVirtualBucket)}, + ) + if err != nil { + return nil, err + } + sessionClient := s3.New(sess) + providers[S3] = &S3Provider{ + Client: sessionClient, + Downloader: s3manager.NewDownloaderWithClient(sessionClient, func(d *s3manager.Downloader) { + }), + } + } + + } + + return providers[protocol], nil +} diff --git a/pkg/agent/watcher_test.go b/pkg/agent/watcher_test.go index 1a1d6c7699..d378962721 100644 --- a/pkg/agent/watcher_test.go +++ b/pkg/agent/watcher_test.go @@ -321,7 +321,7 @@ var _ = Describe("Watcher", func() { logger.Printf("Creating mock GCS Client") ctx := context.Background() client := mocks.NewMockClient() - cl := storage.GCSProvider { + cl := storage.GCSProvider{ Client: client, } @@ -340,7 +340,7 @@ var _ = Describe("Watcher", func() { err := cl.DownloadModel(modelDir, modelName, modelStorageURI) Expect(err).To(BeNil()) - testFile := filepath.Join(modelDir, "model1/testModel1") + testFile := filepath.Join(modelDir, "model1") dat, err := ioutil.ReadFile(testFile) Expect(err).To(BeNil()) Expect(string(dat)).To(Equal(modelContents)) @@ -354,7 +354,7 @@ var _ = Describe("Watcher", func() { logger.Printf("Creating mock GCS Client") ctx := context.Background() client := mocks.NewMockClient() - cl := storage.GCSProvider { + cl := storage.GCSProvider{ Client: client, } @@ -381,7 +381,7 @@ var _ = Describe("Watcher", func() { logger.Printf("Creating mock GCS Client") ctx := context.Background() client := mocks.NewMockClient() - cl := storage.GCSProvider { + cl := storage.GCSProvider{ Client: client, } @@ -432,7 +432,7 @@ var _ = Describe("Watcher", func() { // Creating GCS mock client and populating buckets ctx := context.Background() client := mocks.NewMockClient() - cl := storage.GCSProvider { + cl := storage.GCSProvider{ Client: client, } bkt := client.Bucket("testBucket") diff --git a/pkg/apis/serving/v1alpha1/trained_model.go b/pkg/apis/serving/v1alpha1/trained_model.go index 1feaa44389..0ce9794233 100644 --- a/pkg/apis/serving/v1alpha1/trained_model.go +++ b/pkg/apis/serving/v1alpha1/trained_model.go @@ -46,7 +46,7 @@ type TrainedModelList struct { Items []TrainedModel `json:"items"` } -// TrainedModelSpec defines the trained model spec +// TrainedModelSpec defines the TrainedModel spec // +k8s:openapi-gen=true type TrainedModelSpec struct { // parent inference service to deploy to @@ -57,7 +57,7 @@ type TrainedModelSpec struct { Model ModelSpec `json:"model"` } -// ModelSpec describes a trained model +// ModelSpec describes a TrainedModel // +k8s:openapi-gen=true type ModelSpec struct { // Storage URI for the model repository diff --git a/pkg/apis/serving/v1beta1/openapi_generated.go b/pkg/apis/serving/v1beta1/openapi_generated.go index 072c4b057c..4f4486fe00 100644 --- a/pkg/apis/serving/v1beta1/openapi_generated.go +++ b/pkg/apis/serving/v1beta1/openapi_generated.go @@ -77,7 +77,7 @@ func schema_pkg_apis_serving_v1alpha1_ModelSpec(ref common.ReferenceCallback) co return common.OpenAPIDefinition{ Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ - Description: "ModelSpec describes a trained model", + Description: "ModelSpec describes a TrainedModel", Type: []string{"object"}, Properties: map[string]spec.Schema{ "storageUri": { @@ -209,7 +209,7 @@ func schema_pkg_apis_serving_v1alpha1_TrainedModelSpec(ref common.ReferenceCallb return common.OpenAPIDefinition{ Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ - Description: "TrainedModelSpec defines the trained model spec", + Description: "TrainedModelSpec defines the TrainedModel spec", Type: []string{"object"}, Properties: map[string]spec.Schema{ "inferenceService": { diff --git a/pkg/apis/serving/v1beta1/swagger.json b/pkg/apis/serving/v1beta1/swagger.json index 66d1d5a4a8..ba40639766 100644 --- a/pkg/apis/serving/v1beta1/swagger.json +++ b/pkg/apis/serving/v1beta1/swagger.json @@ -8,7 +8,7 @@ "paths": {}, "definitions": { "v1alpha1.ModelSpec": { - "description": "ModelSpec describes a trained model", + "description": "ModelSpec describes a TrainedModel", "type": "object", "required": [ "storageUri", @@ -80,7 +80,7 @@ } }, "v1alpha1.TrainedModelSpec": { - "description": "TrainedModelSpec defines the trained model spec", + "description": "TrainedModelSpec defines the TrainedModel spec", "type": "object", "required": [ "inferenceService", diff --git a/python/kfserving/kfserving/__init__.py b/python/kfserving/kfserving/__init__.py index eb3ecdb0f7..a8bce03ef2 100644 --- a/python/kfserving/kfserving/__init__.py +++ b/python/kfserving/kfserving/__init__.py @@ -33,6 +33,12 @@ from kfserving.exceptions import ApiKeyError from kfserving.exceptions import ApiException +# import v1alpha1 models into kfserving packages +from kfserving.models.v1alpha1_model_spec import V1alpha1ModelSpec +from kfserving.models.v1alpha1_trained_model import V1alpha1TrainedModel +from kfserving.models.v1alpha1_trained_model_list import V1alpha1TrainedModelList +from kfserving.models.v1alpha1_trained_model_spec import V1alpha1TrainedModelSpec + # import v1alpha2 models into kfserving package from kfserving.models.knative_addressable import KnativeAddressable from kfserving.models.knative_condition import KnativeCondition @@ -82,7 +88,6 @@ from kfserving.models.v1beta1_inference_services_config import V1beta1InferenceServicesConfig from kfserving.models.v1beta1_ingress_config import V1beta1IngressConfig from kfserving.models.v1beta1_logger_spec import V1beta1LoggerSpec -from kfserving.models.v1beta1_model_spec import V1beta1ModelSpec from kfserving.models.v1beta1_onnx_runtime_spec import V1beta1ONNXRuntimeSpec from kfserving.models.v1beta1_pod_spec import V1beta1PodSpec from kfserving.models.v1beta1_predictor_config import V1beta1PredictorConfig @@ -98,6 +103,4 @@ from kfserving.models.v1beta1_transformers_config import V1beta1TransformersConfig from kfserving.models.v1beta1_triton_spec import V1beta1TritonSpec from kfserving.models.v1beta1_xg_boost_spec import V1beta1XGBoostSpec -from kfserving.models.v1alpha1_trained_model import V1alpha1TrainedModel -from kfserving.models.v1alpha1_trained_model_list import V1alpha1TrainedModelList from kfserving.models.v1beta1_light_gbm_spec import V1beta1LightGBMSpec diff --git a/python/kfserving/kfserving/api/kf_serving_client.py b/python/kfserving/kfserving/api/kf_serving_client.py index ec306fffaa..af286d8faa 100644 --- a/python/kfserving/kfserving/api/kf_serving_client.py +++ b/python/kfserving/kfserving/api/kf_serving_client.py @@ -13,8 +13,10 @@ # limitations under the License. import time +import requests import logging from kubernetes import client, config +from urllib.parse import urlparse from ..constants import constants from ..utils import utils @@ -358,3 +360,61 @@ def wait_isvc_ready(self, name, namespace=None, # pylint:disable=too-many-argum current_isvc = self.get(name, namespace=namespace, version=version) raise RuntimeError("Timeout to start the InferenceService {}. \ The InferenceService is as following: {}".format(name, current_isvc)) + + def create_trained_model(self, trainedmodel, namespace): + """ + Create a trained model + :param trainedmodel: trainedmodel object + :param namespace: defaults to current or default namespace + :return: + """ + version = trainedmodel.api_version.split("/")[1] + + try: + outputs = self.api_instance.create_namespaced_custom_object( + constants.KFSERVING_GROUP, + version, + namespace, + constants.KFSERVING_PLURAL_TRAINEDMODEL, + trainedmodel) + except client.rest.ApiException as e: + raise RuntimeError( + "Exception when calling CustomObjectsApi->create_namespaced_custom_object:\ + %s\n" % e) + + def wait_model_ready(self, service_name, model_name, isvc_namespace=None, # pylint:disable=too-many-arguments + isvc_version=constants.KFSERVING_V1BETA1_VERSION, + cluster_ip=None, + timeout_seconds=600, + polling_interval=10): + """ + Waiting for model to be ready to service, print out trained model if timeout. + :param service_name: inference service name + :param model_name: trained model name + :param isvc_namespace: defaults to current or default namespace of inference service + :param isvc_version: api group version of inference service + :param cluster_ip: ip of the kuberenetes cluster + :param timeout_seconds: timeout seconds for waiting, default to 600s. + Print out the InferenceService if timeout. + :param polling_interval: The time interval to poll status + :return: + """ + isvc = self.get( + service_name, + namespace=isvc_namespace, + version=isvc_version, + ) + + host = urlparse(isvc["status"]["url"]).netloc + headers = {"Host": host} + + for _ in range(round(timeout_seconds/polling_interval)): + time.sleep(polling_interval) + # Check model health API + url = f"http://{cluster_ip}/v1/models/{model_name}" + response = requests.get(url, headers=headers).status_code + if response == 200: + return + + raise RuntimeError(f"InferenceService ({service_name}) has not loaded the \ + model ({model_name}) before the timeout.") \ No newline at end of file diff --git a/python/kfserving/kfserving/constants/constants.py b/python/kfserving/kfserving/constants/constants.py index cc04deb6a3..d2e235fe22 100644 --- a/python/kfserving/kfserving/constants/constants.py +++ b/python/kfserving/kfserving/constants/constants.py @@ -18,12 +18,16 @@ KFSERVING_GROUP = 'serving.kubeflow.org' KFSERVING_KIND = 'InferenceService' KFSERVING_PLURAL = 'inferenceservices' +KFSERVING_KIND_TRAINEDMODEL = 'TrainedModel' +KFSERVING_PLURAL_TRAINEDMODEL = 'trainedmodels' KFSERVING_VERSION = os.environ.get('KFSERVING_VERSION', 'v1alpha2') KFSERVING_V1BETA1_VERSION = 'v1beta1' KFSERVING_V1ALPHA2_VERSION = 'v1alpha2' +KFSERVING_V1ALPHA1_VERSION = "v1alpha1" KFSERVING_API_VERSION = KFSERVING_GROUP + '/' + KFSERVING_VERSION KFSERVING_V1BETA1 = KFSERVING_GROUP + '/' + KFSERVING_V1BETA1_VERSION KFSERVING_V1ALPHA2 = KFSERVING_GROUP + '/' + KFSERVING_V1ALPHA2_VERSION +KFSERVING_V1ALPHA1 = KFSERVING_GROUP + '/' + KFSERVING_V1ALPHA1_VERSION KFSERVING_LOGLEVEL = os.environ.get('KFSERVING_LOGLEVEL', 'INFO').upper() diff --git a/test/e2e/common/utils.py b/test/e2e/common/utils.py index 648313abbe..5b90485823 100644 --- a/test/e2e/common/utils.py +++ b/test/e2e/common/utils.py @@ -28,7 +28,8 @@ KFSERVING_TEST_NAMESPACE = "kfserving-ci-e2e-test" -def predict(service_name, input_json, protocol_version="v1", version=constants.KFSERVING_V1BETA1_VERSION): +def predict(service_name, input_json, protocol_version="v1", + version=constants.KFSERVING_V1BETA1_VERSION, model_name=None): isvc = KFServing.get( service_name, namespace=KFSERVING_TEST_NAMESPACE, @@ -40,9 +41,12 @@ def predict(service_name, input_json, protocol_version="v1", version=constants.K host = urlparse(isvc["status"]["url"]).netloc headers = {"Host": host} - url = f"http://{cluster_ip}/v1/models/{service_name}:predict" + if model_name is None: + model_name = service_name + + url = f"http://{cluster_ip}/v1/models/{model_name}:predict" if protocol_version == "v2": - url = f"http://{cluster_ip}/v2/models/{service_name}/infer" + url = f"http://{cluster_ip}/v2/models/{model_name}/infer" with open(input_json) as json_file: data = json.load(json_file) diff --git a/test/e2e/predictor/test_multi_model_serving.py b/test/e2e/predictor/test_multi_model_serving.py new file mode 100644 index 0000000000..1a74d17372 --- /dev/null +++ b/test/e2e/predictor/test_multi_model_serving.py @@ -0,0 +1,215 @@ +# Copyright 2021 kubeflow.org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from kubernetes import client +from kfserving import ( + constants, + KFServingClient, + V1beta1PredictorSpec, + V1alpha1TrainedModel, + V1alpha2EndpointSpec, + V1beta1InferenceService, + V1beta1InferenceServiceSpec, + V1alpha1ModelSpec, + V1alpha1TrainedModelSpec, + V1beta1SKLearnSpec, + V1beta1XGBoostSpec, + V1alpha2TritonSpec, +) + +from ..common.utils import predict, get_cluster_ip +from ..common.utils import KFSERVING_TEST_NAMESPACE + +KFServing = KFServingClient(config_file=os.environ.get("KUBECONFIG", "~/.kube/config")) + +def test_mms_sklearn_kfserving(): + # Define an inference service + predictor = V1beta1PredictorSpec( + min_replicas=1, + sklearn=V1beta1SKLearnSpec( + protocol_version="v1", + resources=client.V1ResourceRequirements( + requests={"cpu": "100m", "memory": "256Mi"}, + limits={"cpu": "100m", "memory": "256Mi"}, + ) + ) + ) + + service_name = "isvc-sklearn-mms" + isvc = V1beta1InferenceService( + api_version=constants.KFSERVING_V1BETA1, + kind=constants.KFSERVING_KIND, + metadata=client.V1ObjectMeta( + name=service_name, + namespace=KFSERVING_TEST_NAMESPACE + ), + spec=V1beta1InferenceServiceSpec(predictor=predictor) + ) + + # Define trained models + model1_spec = V1alpha1ModelSpec( + storage_uri=f"gs://kfserving-samples/models/sklearn/iris", + memory='256Mi', + framework="sklearn" + ) + + model2_spec = V1alpha1ModelSpec( + storage_uri=f"gs://kfserving-samples/models/sklearn/iris", + memory='256Mi', + framework="sklearn" + ) + + model1_name = "model1-sklearn" + model2_name = "model2-sklearn" + model1 = V1alpha1TrainedModel( + api_version=constants.KFSERVING_V1ALPHA1, + kind=constants.KFSERVING_KIND_TRAINEDMODEL, + metadata=client.V1ObjectMeta( + name=model1_name, + namespace=KFSERVING_TEST_NAMESPACE + ), + spec=V1alpha1TrainedModelSpec( + inference_service=service_name, + model=model1_spec + ) + ) + + model2 = V1alpha1TrainedModel( + api_version=constants.KFSERVING_V1ALPHA1, + kind=constants.KFSERVING_KIND_TRAINEDMODEL, + metadata=client.V1ObjectMeta( + name=model2_name, + namespace=KFSERVING_TEST_NAMESPACE + ), + spec=V1alpha1TrainedModelSpec( + inference_service=service_name, + model=model2_spec + ) + ) + + # Create an instance of inference service with isvc + KFServing.create(isvc) + KFServing.wait_isvc_ready(service_name, namespace=KFSERVING_TEST_NAMESPACE) + + # Create instances of trained models using model1 and model2 + KFServing.create_trained_model(model1, KFSERVING_TEST_NAMESPACE) + KFServing.create_trained_model(model2, KFSERVING_TEST_NAMESPACE) + + cluster_ip = get_cluster_ip() + + KFServing.wait_model_ready(service_name, model1_name, isvc_namespace=KFSERVING_TEST_NAMESPACE, + isvc_version=constants.KFSERVING_V1BETA1_VERSION, cluster_ip=cluster_ip) + KFServing.wait_model_ready(service_name, model2_name, isvc_namespace=KFSERVING_TEST_NAMESPACE, + isvc_version=constants.KFSERVING_V1BETA1_VERSION, cluster_ip=cluster_ip) + + # Call predict on the two models + res_model1 = predict(service_name, "./data/iris_input.json", model_name=model1_name) + res_model2 = predict(service_name, "./data/iris_input.json", model_name=model2_name) + + assert res_model1["predictions"] == [1,1] + assert res_model2["predictions"] == [1,1] + + # Clean up inference service + KFServing.delete(service_name, KFSERVING_TEST_NAMESPACE) + +def test_mms_xgboost_kfserving(): + # Define an inference service + predictor = V1beta1PredictorSpec( + min_replicas=1, + xgboost=V1beta1XGBoostSpec( + protocol_version="v1", + resources=client.V1ResourceRequirements( + requests={"cpu": "100m", "memory": "256Mi"}, + limits={"cpu": "100m", "memory": "256Mi"}, + ) + ) + ) + + service_name = "isvc-xgboost-mms" + isvc = V1beta1InferenceService( + api_version=constants.KFSERVING_V1BETA1, + kind=constants.KFSERVING_KIND, + metadata=client.V1ObjectMeta( + name=service_name, + namespace=KFSERVING_TEST_NAMESPACE + ), + spec=V1beta1InferenceServiceSpec(predictor=predictor) + ) + + # Define trained models + model1_spec = V1alpha1ModelSpec( + storage_uri="gs://kfserving-samples/models/xgboost/iris", + memory='256Mi', + framework="xgboost" + ) + + model2_spec = V1alpha1ModelSpec( + storage_uri="gs://kfserving-samples/models/xgboost/iris", + memory='256Mi', + framework="xgboost" + ) + + model1_name = "model1-xgboost" + model2_name = "model2-xgboost" + model1 = V1alpha1TrainedModel( + api_version=constants.KFSERVING_V1ALPHA1, + kind=constants.KFSERVING_KIND_TRAINEDMODEL, + metadata=client.V1ObjectMeta( + name=model1_name, + namespace=KFSERVING_TEST_NAMESPACE + ), + spec=V1alpha1TrainedModelSpec( + inference_service=service_name, + model=model1_spec + ) + ) + + model2 = V1alpha1TrainedModel( + api_version=constants.KFSERVING_V1ALPHA1, + kind=constants.KFSERVING_KIND_TRAINEDMODEL, + metadata=client.V1ObjectMeta( + name=model2_name, + namespace=KFSERVING_TEST_NAMESPACE + ), + spec=V1alpha1TrainedModelSpec( + inference_service=service_name, + model=model2_spec + ) + ) + + # Create an instance of inference service with isvc + KFServing.create(isvc) + KFServing.wait_isvc_ready(service_name, namespace=KFSERVING_TEST_NAMESPACE) + + # Create instances of trained models using model1 and model2 + KFServing.create_trained_model(model1, KFSERVING_TEST_NAMESPACE) + KFServing.create_trained_model(model2, KFSERVING_TEST_NAMESPACE) + + cluster_ip = get_cluster_ip() + + KFServing.wait_model_ready(service_name, model1_name, isvc_namespace=KFSERVING_TEST_NAMESPACE, + isvc_version=constants.KFSERVING_V1BETA1_VERSION, cluster_ip=cluster_ip) + KFServing.wait_model_ready(service_name, model2_name, isvc_namespace=KFSERVING_TEST_NAMESPACE, + isvc_version=constants.KFSERVING_V1BETA1_VERSION, cluster_ip=cluster_ip) + + # Call predict on the two models + res_model1 = predict(service_name, "./data/iris_input.json", model_name=model1_name) + res_model2 = predict(service_name, "./data/iris_input.json", model_name=model2_name) + + assert res_model1["predictions"] == [1,1] + assert res_model2["predictions"] == [1,1] + + # Clean up inference service + KFServing.delete(service_name, KFSERVING_TEST_NAMESPACE)