Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Multi-Model Puller #989

Merged
merged 21 commits into from Aug 27, 2020
Merged

[POC] Multi-Model Puller #989

merged 21 commits into from Aug 27, 2020

Conversation

ifilonenko
Copy link
Contributor

@ifilonenko ifilonenko commented Aug 1, 2020

What this PR does / why we need it:
This PR is needed when trying to define the multi-model story. The agent is responsible for pulling models into a model directory that the model server will use to host.

Fixes #1040

Design:

We assume the ConfigMap looks like this:

apiVersion: v1
kind: ConfigMap
metadata:
  name: models-config
data:
  models.json: |
    [
      {
        "modelName": "model1",
        "modelSpec": {
          "storageUri": "s3://example-bucket/path/to/model1",
          "framework": "sklearn",
          "memory": "1G"
        }
      },
      {
        "modelName": "model2",
        "modelSpec": {
          "storageUri": "s3://example-bucket/path/to/model2",
          "framework": "sklearn",
          "memory": "1G"
        }
      }
    ]

as such model_name1.json and model_name2.json will each be files in the directory into which models-config will be mounted. This puller will watch (or pull) form this directory should any models be added or removed from the cmap, and send the appropriate request to the load / unload methods.

Special notes for your reviewer:
Tested by running:

apiVersion: v1
kind: Pod
metadata:
  name: puller
spec:
  serviceAccountName: default
  restartPolicy: Never
  containers:
  - name: job
    image: <my_image>
    imagePullPolicy: Always
    env:
    - name: AWS_SECRET_KEY
      value: <my_key>
    - name: AWS_ACCESS_KEY
      value: <my_other_key>
    resources:
      limits:
        cpu: "1"
        memory: 1Gi
      requests:
        cpu: "1"
        memory: 1Gi
    command:
    - /agent
    args:
    - -config-dir
    - /mnt/configs
    - -model-dir
    - /tmp/models
    - -num-workers
    - "1"
    - -s3-endpoint
    - "<my_endpoint>"
    volumeMounts:
    - name: config-volume
      mountPath: /mnt/configs
    - name: model-volume
      mountPath: /tmp/models
  volumes:
    - name: config-volume
      configMap:
        name: models-config
    - name: model-volume
      emptyDir: {}
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: models-config
data:

by modifying the data with the addition of model1, the addition of model2:

2020/08/17 22:55:03 Entering watch
====
2020/08/19 17:55:10 Name: model1 Spec: {s3://platform-test/flowers/ sklearn {{1 9} {<nil>} 1G DecimalSI}}
2020/08/19 17:55:10 Name: model2 Spec: {s3://platform-test/model.conf sklearn {{1 9} {<nil>} 1G DecimalSI}}
2020/08/19 17:55:10 Need to add model model1
2020/08/19 17:55:10 Sending event {0xc0003141e0 Load true}
2020/08/19 17:55:10 worker 1 for model1 is initialized
2020/08/19 17:55:10 worker 1 model1 started  job {0xc0003141e0 Load true}
2020/08/19 17:55:10 Should download s3://platform-test/flowers/
2020/08/19 17:55:10 Processing: s3://platform-test/flowers/ = 2551120587
2020/08/19 17:55:10 Downloading:  s3://platform-test/flowers/
2020/08/19 17:55:10 Need to add model model2
2020/08/19 17:55:10 Sending event {0xc0003142a0 Load true}
2020/08/19 17:55:10 worker 1 for model2 is initialized
2020/08/19 17:55:10 worker 1 model2 started  job {0xc0003142a0 Load true}
2020/08/19 17:55:10 Should download s3://platform-test/model.conf
2020/08/19 17:55:10 Processing: s3://platform-test/model.conf = 3669683301
2020/08/19 17:55:10 Downloading:  s3://platform-test/model.conf
2020/08/19 17:55:10 Now doing a request on {0xc0003142a0 Load true}
2020/08/19 17:55:16 Now doing a request on {0xc0003141e0 Load true}
  • manual tests

Release note:

The initial skeleton of the side-car agent

@kubeflow-bot
Copy link

This change is Reviewable

@k8s-ci-robot
Copy link
Contributor

Hi @ifilonenko. Thanks for your PR.

I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@ifilonenko
Copy link
Contributor Author

ifilonenko commented Aug 1, 2020

@yuzisun @ellistarn @yuzliu @cliveseldon @rakelkar @njhill for review

func main() {
flag.Parse()
puller.OnConfigChange(func(e puller.EventWrapper) {
log.Println("Send a request to:", e.LoadState)
Copy link
Contributor Author

@ifilonenko ifilonenko Aug 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section will do a load / unload call via a local HTTP request to 0.0.0.0:xxxx.

EventWrapper has information pertaining to:

  1. Which model
  2. Whether load / unload
  3. What information -- should it be a load

@ellistarn
Copy link
Contributor

We assume the ConfigMap (which will be specified in a separate PR) looks like this:

apiVersion: v1
kind: ConfigMap
metadata:
  name: models-config
  namespace: <user-namespace>
data:
  model_name1.json |
  - storageUri: xxx
  - framework: xxx
  - memory: xxx

For framework, is this string going relate at all to the image in the Predictor spec, i.e. "tensorflow:v1.12", "patched_tensorflow:v1.12" or will it simply be a well known enum of supported MMS frameworks?

log.Println("Send a request to:", e.LoadState)
log.Println("for model", e.ModelName)
})
puller.WatchConfig(*modelDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ellistarn We'd like to not depend on k8s for model puller, likewise for model server.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know when there is a verdict on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with @yuzisun on this

pkg/puller/watcher.go Outdated Show resolved Hide resolved
puller.Dockerfile Outdated Show resolved Hide resolved
@@ -0,0 +1,21 @@
# Build the inference-puller binary
FROM golang:1.13.0 as builder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move to 1.14 asap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries there, should I do that in this PR? Or is that for another time

@ifilonenko
Copy link
Contributor Author

ifilonenko commented Aug 2, 2020

For framework, is this string going relate at all to the image in the Predictor spec, i.e. "tensorflow:v1.12", "patched_tensorflow:v1.12" or will it simply be a well known enum of supported MMS frameworks?

I was thinking of doing well-known enums, personally. But the nuances of that decision will be in a separate PR, so we can move that discussion to there. This PR will rely on that one.

[EDIT]

PR can be found here: #992
let's move conversation there @ellistarn

ShouldLoad State = "Load"
ShouldUnload State = "Unload"

writeOrCreateMask = fsnotify.Write | fsnotify.Create
Copy link
Contributor

@yuzliu yuzliu Aug 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the writeOrCreateMask state mean? Should configMap update only result in two state: "Load" or "Unload"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I saw how it is used later 👍

cmd/puller/main.go Outdated Show resolved Hide resolved
// 2 - if the model file was removed as a result of deletion or renaming
if p.onConfigChange != nil {
ext := filepath.Ext(event.Name)
isEdit := event.Op&writeOrCreateMask != 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using bitwise & AND makes me nervous here, let's me sure we have good tests for it :)

isValidFile := ext == p.fileExtension
if isValidFile && (isEdit || isRemove) {
fileName := strings.TrimSuffix(filepath.Base(event.Name), ext)
if isRemove {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I like that we have a file for each model in the configMap. We don't need to do a diff on every ConfigMap change which makes the logic cleaner and simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add version information in the filename e.g. - once we are ready to add VersionPolicy in the TrainedModel CRD.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think that can be contained in content of the json instead of the filename? although, we are getting a head of ourselves, that will be for a later PR :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can discuss the details once we add version policy.

)

var (
modelDir = flag.String("model_dir", "/tmp","directory for multi-model config files")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we default to /mnt/models ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are config files, I thought we would default /mnt/models for model files, but configs would go into a different location. Lmk where you prefer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change the flag name to config-dir since model_dir implies that's where to models go (to me at least), and how about /mnt/config as the default?

log.Println("Send a request to:", e.LoadState)
log.Println("for model", e.ModelName)
})
puller.WatchConfig(*modelDir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ellistarn We'd like to not depend on k8s for model puller, likewise for model server.

err := try.Do(func(attempt int) (bool, error) {
err := download(event.ModelDef.StorageUri)
if err != nil {
time.Sleep(1 * time.Second) // wait a second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we wait for a second here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a dummy thing for retry purposes, I don’t mind if we retry immediately. It’s a WIP


var (
modelDir = flag.String("model_dir", "/tmp","directory for multi-model config files")
numWorkers = flag.Int("num_workers", 1,"number of workers for parallel downloads")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the convention for argument is num-workers

func InitiatePullers(numWorkers int, numRetries int) {
p.NumWorkers = numWorkers
p.NumRetries = numRetries
p.Channel = make(chan EventWrapper)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need buffer size here otherwise it is going to block the sender if receiver is still consuming the event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the buffer be the number of workers?

Copy link
Contributor

@njhill njhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ifilonenko this is a great start, thanks! I think there's a fair bit more to be considered though to make this robust, and to do so while keeping it as efficient/reactive as possible may be nontrivial.

For example:

  • Careful coordination may be needed w.r.t. management of per-model state to avoid race conditions.
    • What happens if a model is removed while it is downloading? or added while a prior model with same name is still in the process of unloading?
    • How will changes to existing model definitions be handled in a controlled manner? In some model servers this may entail unloading the existing model followed by loading the new one. As an aside this is partly why I feel that it could be best for TrainedModels to be immutable - maybe for another discussion...
  • What happens if the puller container dies/restarts and in the meantime models are removed from the configmap? There will be no watcher events for these deletions and so they would remain orphaned with files left on the shared volume and/or loaded in the model server. So an audit of the model data dir would be needed on startup to diff with the current configmap. In support of this I think a special file will need to be written alongside the model data to reflect download completion (along with source URI) and subsequently successful loading into the model server.
  • What happens if the model server dies/restarts? Depending on its control mechanism, some way of querying its current config might be needed to reconcile state otherwise lost models won't be reinstated. TF-Serving (and maybe others) sidestep this problem via use of a declarative manifest
  • Related to that last point, we may want to consider both declarative and command-based cases w.r.t. interfacing with different model servers... load/unload events aren't very useful in the TF Serving case since it expects the full manifest to be amended and determines the deltas/actions itself.

What would make most sense imo is a level-triggered reconciliation loop analogous to a Kube controller, which compares/converges desired/actual states. Where desired state is the configmap mount dir, and actual state includes the model data shared volume contents, model server's own reported state, and any in-progress file download and/or deletion jobs.

Finally what's the current thinking w.r.t. reporting status back? Ideally the TrainedModel CR Status should reflect whether the model is loading / loaded / failed download / failed model-server load.

BTW I'd be happy to help with this and apologies if some/all was already planned in some way...

)

var (
modelDir = flag.String("model_dir", "/tmp","directory for multi-model config files")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change the flag name to config-dir since model_dir implies that's where to models go (to me at least), and how about /mnt/config as the default?

var w *Watcher

type Watcher struct {
modelDir string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to other comment maybe call it modelConfigDir for clarity?

type Watcher struct {
modelDir string
onConfigChange func(EventWrapper)
fileExtension string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do file extensions really need to be handled? How about keeping it simple and just assume that the designated directory is only for models and that model name == filename?

func OnConfigChange(run func(in EventWrapper)) {
log.Println("Applying onConfigChange")
w.onConfigChange = run
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could just have this as a second arg to WatchConfig func?

isEdit := event.Op&writeOrCreateMask != 0
isRemove := event.Op&fsnotify.Remove != 0
isValidFile := ext == w.fileExtension
if isValidFile && (isEdit || isRemove) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is going to work as intended, in particular it won't detect changes to existing files since Kubernetes creates them as symlinks pointing to files with a hidden sibling dir called ..data, which is itself symlinked to a versioned dir (also hidden) containing a consistent revision of the configmap.

If keys are added to / removed from the configmap then you will get events for the corresponding symlinks being created which will be what you observed when testing, but they won't produce their own events when key contents changes.

My suggestion would be to:

  • Keep a filename/key -> ModelDefinition map in memory of the current state
  • Watch only for fsnotify.Create events corresponding to the ..data symlink
  • On that event, resolve the ..data symlink to its (versioned) target dir and read its contents; then produce any load/unload events based on diffing this with the map in memory (with a retry loop for IO errors in case the configmap is updated concurrently)

@yuzisun
Copy link
Member

yuzisun commented Aug 5, 2020

Great comments ! thanks @njhill !

  • What happens if a model is removed while it is downloading? or added while a prior model with same name is still in the process of unloading?

All the events for the same model should be processed by the same worker, we can create a set of channels and assign a worker for each channel, so events for same models are consistent hashed to the same channel and processed by the same worker.

  • How will changes to existing model definitions be handled in a controlled manner? In some model servers this may entail unloading the existing model followed by loading the new one. As an aside this is partly why I feel that it could be best for TrainedModels to be immutable - maybe for another discussion...

I think this depends on how KFServing manages the model version, definitely another big discussion.

  • What happens if the puller container dies/restarts and in the meantime models are removed from the configmap? There will be no watcher events for these deletions and so they would remain orphaned with files left on the shared volume and/or loaded in the model server. So an audit of the model data dir would be needed on startup to diff with the current configmap. In support of this I think a special file will need to be written alongside the model data to reflect download completion (along with source URI) and subsequently successfully loading into the model server.

Once the model puller restarts it should reconcile the model dir with the current configmap, yes I agree an audit trail file is needed to facilitate this reconciliation process. Each successful download should write a file with the storage uri there, if the file does not present or storage uri in the file is different from the one in the configmap, we should trigger a new download.

  • What happens if the model server dies/restarts? Depending on its control mechanism, some way of querying its current config might be needed to reconcile state otherwise lost models won't be reinstated. TF-Serving (and maybe others) sidestep this problem via use of a declarative manifest

TFServing can recover from the model configuration during restarts, for other model servers I think model puller need someway to detect the model server status and resync all the models, on the other hand trained model controller also resync the models periodically which then triggers load/unload to the model server.

  • Related to that last point, we may want to consider both declarative and command-based cases w.r.t. interfacing with different model servers... load/unload events aren't very useful in the TF Serving case since it expects the full manifest to be amended and determines the deltas/actions itself.

What would make most sense imo is a level-triggered reconciliation loop analogous to a Kube controller, which compares/converges desired/actual states. Where desired state is the configmap mount dir, and actual state includes the model data shared volume contents, model server's own reported state, and any in-progress file download and/or deletion jobs.

Finally what's the current thinking w.r.t. reporting status back? Ideally the TrainedModel CR Status should reflect whether the model is loading / loaded / failed download / failed model-server load.

Correct, we have planned this work to send model status probes from trained model controller so status can be reflected.

BTW I'd be happy to help with this and apologies if some/all was already planned in some way...

@njhill
Copy link
Contributor

njhill commented Aug 5, 2020

Thanks @yuzisun

All the events for the same model should be processed by the same worker, we can create a set of channels and assign a worker for each channel, so events for same models are consistent hashed to the same channel and processed by the same worker.

I'm not sure about the hashing approach. It would be functional but risk delaying work resulting in longer load times than necessary sometimes. There are use cases where the rate of model churn is quite high which are sensitive to total time it takes for a model to become available. For tfserving there is also a question of how best to trigger the config reloads.

I think this depends on how KFServing manages the model version, definitely another big discussion.

Agree, please include me because I have some thoughts :)

Correct, we have planned this work to send model status probes from trained model controller so status can be reflected.

Great, but curious from trainedmodel controller to who exactly? Imo this should come from the puller somehow rather directly from model server (whether that is push or pull) since there could be other kinds of failures related to downloading for example, and it will be keeping track of the model-server's state anyhow as mentioned in the model-server restart recovery point.

To me this seems analogous to how Pod CRs are manipulated in the context of scheduling to Nodes. Various actors contribute to the Pod's Status and update it directly IIRC (specifically the Conditions). In particular it's the kubelet that monitors and updates the running status of the Pod, and here I would suggest Puller/Model is kind of equivalent to Kubelet/Pod. But that would probably mean coupling the puller to Kube which I also agree seems to be a nice thing to avoid if possible.

@yuzliu yuzliu mentioned this pull request Aug 9, 2020
@yuzliu
Copy link
Contributor

yuzliu commented Aug 9, 2020

We assume the ConfigMap (which will be specified in a separate PR) looks like this:

apiVersion: v1
kind: ConfigMap
metadata:
  name: models-config
  namespace: <user-namespace>
data:
  model_name1.json |
  - storageUri: xxx
  - framework: xxx
  - memory: xxx

For framework, is this string going relate at all to the image in the Predictor spec, i.e. "tensorflow:v1.12", "patched_tensorflow:v1.12" or will it simply be a well known enum of supported MMS frameworks?

@ellistarn, @yuzisun and are considering to remove version information from the ModelSpec's framework enum and from this model configuration file. The reason is that we can validate framework against different model servers, but validating if X model server supports Y framework's Z version is pretty difficult. But if we allow users to put ML framework version in the TrainedModel CR they will expect that KFServing can validate framework versions against model server versions properly.

@yuzisun
Copy link
Member

yuzisun commented Aug 27, 2020

/retest

Copy link
Contributor

@ellistarn ellistarn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great launching point. Thanks for putting in so much effort in refinements so far.
/lgtm


# Copy in the go src
WORKDIR /go/src/github.com/kubeflow/kfserving
COPY pkg/ pkg/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I learned a cool trick, that if you first do:
COPY go.mod
COPY go.sum
RUN go mod download

Docker will correctly layer your images so that you don't need to redownload on build unless your go.mod or go.sum change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ that is neat, will need to re-organize for that. good call, because the build took forevahh

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate ? move before the pkg?

@yuzisun
Copy link
Member

yuzisun commented Aug 27, 2020

/approve

@k8s-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: yuzisun

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@k8s-ci-robot k8s-ci-robot merged commit 17b6c04 into kserve:master Aug 27, 2020
njhill added a commit to njhill/kfserving that referenced this pull request Aug 30, 2020
Follow-on changes to kserve#989 based on remaining review suggestions.

- Simplified configmap change diffing
- Connect watcher and puller with event channel
- Have puller track in-progress ops per model via op completion channel and tie lifecycle of per-model channel+goroutine pairs to this
njhill added a commit to njhill/kfserving that referenced this pull request Sep 2, 2020
Follow-on changes to kserve#989 based on remaining review suggestions.

- Simplified configmap change diffing
- Connect watcher and puller with event channel
- Have puller track in-progress ops per model via op completion channel and tie lifecycle of per-model channel+goroutine pairs to this
yuzisun pushed a commit that referenced this pull request Sep 19, 2020
* Puller streamlining/simplification

Follow-on changes to #989 based on remaining review suggestions.

- Simplified configmap change diffing
- Connect watcher and puller with event channel
- Have puller track in-progress ops per model via op completion channel and tie lifecycle of per-model channel+goroutine pairs to this

* Minor change: fully decouple puller from watcher

* Address some of the review comments

The complete ModelOp struct is now passed all the way back and forth.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create InferenceService Model puller sidecar
7 participants