Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Puller streamlining/simplification #1057

Merged
merged 3 commits into from Sep 19, 2020
Merged

Conversation

njhill
Copy link
Contributor

@njhill njhill commented Aug 30, 2020

What this PR does / why we need it:

Follow-on changes to #989 based on remaining review suggestions.

  • Simplified configmap change diffing
  • Connect watcher and puller with event channel
  • Have puller track in-progress ops per model via op completion channel and tie lifecycle of per-model channel+goroutine pairs to this

@k8s-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
To complete the pull request process, please assign animeshsingh
You can assign the PR to them by writing /assign @animeshsingh in a comment when ready.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@kubeflow-bot
Copy link

This change is Reviewable

@k8s-ci-robot
Copy link
Contributor

Hi @njhill. Thanks for your PR.

I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@njhill
Copy link
Contributor Author

njhill commented Aug 30, 2020

This overlaps a bit with #1055 but I had already finished the changes so opening for consideration.

cc @ifilonenko @yuzisun

@yuzisun
Copy link
Member

yuzisun commented Aug 30, 2020

Thanks @njhill! This looks great! yes there is some overlap I think we can reconcile on this PR and then I can rebase once yours is merged in. I have some other refactoring regarding testing.

ChannelMap map[string]Channel
Downloader Downloader
channelMap map[string]ModelChannel
completions chan string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is useful to know the command add or remove on the completion channel(good for me for asserting in tests), in my PR I also included error field to track the download failures but now I think there is a common pattern to have separate error channel. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I hadn't included proper handling of the error cases in this pass, was thinking that could be done in a next iteration :) Since there's quite a lot to consider and I expect it will require some adjustments of the state management... maybe something like making the per-model goroutines more like reconciliation loops.

Having the completion events be/include the original op for now sounds like a good idea though!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I 100% would agree on a seperate error channel. I think that beyond for testing it would be important for the error status endpoint.

channel.EventChannel <- event
}
for name, wrapper := range w.modelTracker {
if wrapper.stale {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I converted the modelconfig to a map and use that to find the removed ones, I think this works too and slightly more efficient but a bit hard to follow the logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was aiming to minimze iterations over the map/slices (i.e. 1 pass over each). Agree the way that the stale flag is toggled could be a bit confusing to read... let me add a couple more comments and if it's still not clear then could change to use an intermediate map

Comment on lines 41 to 50
func (p *Puller) processCommands(commands <-chan ModelOp) {
// channelMap accessed only by this goroutine
var finished bool
for {
select {
case modelOp, ok := <-commands:
if !ok {
finished = true
//commands = nil //TODO tbd
} else {
switch modelOp.Op {
case Add:
p.enqueueModelOp(modelOp.ModelName, modelOp.Spec)
case Remove:
p.enqueueModelOp(modelOp.ModelName, &remove)
}
}
case completed := <-p.completions:
p.modelOpComplete(completed, finished)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked this a lot!

if modelChan.opsInFlight == 0 {
close(modelChan.modelOps)
delete(p.channelMap, modelName)
if closed && len(p.channelMap) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we close the completion channel when all model events are processed? how do we create this channel again as I saw it is only created when starting the puller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for clean teardown of the channels but I guess likely won't be needed since I assume the watcher will keep running until the container is going down anyhow.

The idea is that the ModelEvents channel would first be closed and then this logic would ensure the completions channel is closed as soon as the last in-progress/queued operation completes after that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still a little confused about why we need to close the model channel and completion channel whenn the in-flight model operation is 0. Shouldn't we only close those channels when a model is removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my pretty much my first time writing go so I'm not sure of certain best practices etc. please don't hesitate to point out things that look odd/wrong in general :)

The idea here is to keep the goroutine and channel around only as long as it's needed to serialize multiple events that might be produced for a given model (most likely case model is removed while still in the process of being added). Once the last one of those finishes then they are cleaned up, and recreated on the next event. Otherwise if you had say 1000 models loaded, you would have the same number of channels and goroutines just sitting around doing nothing. But maybe that's not a big deal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider not to close the channel after all in-flight events are done. I'd consider keeping it open and only close when the model is removed, as yuzhui mentioned above. Bringing up and down a channel when in-flights finish and a new one comes in doesn't seem to be most efficient. But then again, do any go experts know the cost of doing that?

existing.Spec, existing.stale = &spec, false
// Changed - replace
w.modelRemoved(name)
w.modelAdded(name, &spec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be just Added/Reload ? Remove then Add triggers unload and load which would cause some downtime of the model on the model server side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that if TrainedModels are immutable (as I think was agreed), this case is unlikely to occur since it would imply the TrainedModel CR itself had been deleted and another with the same name subsequently created. In that case continuity would not be expected anyhow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said above, I think that we should handle updates to the TrainedModel, i.e. i type int the wrong storageURI by accidently, or want to bump memory spec.

@ifilonenko
Copy link
Contributor

/ok-to-test

}
}
}

func (w *Watcher) modelAdded(name string, spec *v1beta1.ModelSpec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is sweet.

if err = watcher.Add(w.configDir); err != nil {
log.Fatal(err)
}
// Add a first create event to the channel to force initial sync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intention of this initial event to replace syncer.Start() in main.go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly, the syncer logic still runs first but moved into NewWatcher function above since I thought it kind of made sense to be part of the watcher. It just populates the initial state of the modelTracker map.

Injecting the event here just forces an intial sync of the map with the configmap. It's done after the fsnotify watch is started to avoid missing any changes, otherwise in theory at least we could sync the configmap, then it changes, then the watch starts ... and so we've missed that change in between.

Oh hmm though I just realized that this might not work if the Events chan is not buffered, I guess it may hang here if so. But that should be fixable by just moving this to right after the goroutine below is started.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im confused by this events add... why not just wait for the event to be created?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think there would necessarily be such an event. The watched files (symlink) will be there from the outset and so I don't think will necessarily produce any create/modification fsnotify events.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my tests there was always an event. Upon your running have you seen differing behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I tested with fsnotify, but I had experimented with a different inotify-based lib where from what I recall there wasn't an event to reflect the initial state. That's cool though, if it's not needed then this line can be removed. I'll try to also test when I get a chance

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah before a lgtm, I will run this in a docker container and do a test. lmk when you have tested so I can do a follow-up as well

type OpType string

const (
Add OpType = "Add"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Add include Update? That was the initial design.
I remember you mentioned assuming that TrainedModel objects should be treated as immutable.
But i disagreed. Example:

TrainedModel 
name: foo
resource: 1g
storage: s3://here/1

if i modify resource I want to send an update command while not re-downloading. If i want to modify storage to:
s3://there/1 and then change it back to s3://here/1 I don't want to re-download s3://here/1. Example, what if I mis-type something, don't want to delete, would rather only delete up on TrainedModel deletion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TrainedModel objects should be treated as immutable.

I thought there was some agreement on this on the last call we had. I feel fairly strongly that it's needed, it is analogous to why pods are immutable. If you change a pod spec the pod does not reconfigure itself, a new pod with a new unique name replaces it, orchestrated via replica sets. This goes hand-in-hand with the proposal for InferenceRouters to be the vehicle for versioning and stable inferencing endpoints.

So it's important to close on this one way or the other I think since it has significant design implications.

cc @yuzisun @ellistarn @yuzliu

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have missed that, I am willing to be convinced otherwise :) I just think that the creation of a new model, of the same name would be common if users update their TrainedModel. Example: users update InferenceService when trying to patch their version, they would probably want to assume the same behavior for the TrainedModels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I agree that appears kind of inconsistent with being able to directly change the storageUri of a single-model InferenceService. But it ultimately comes down to the fact that there isn't any indirection of the model names so there's no way that this could be implemented non-disruptively in the general case. Changing the storageUri like that would entail a small outage, possibly indefinite if there's some issue with the new URI/model data, which is a non-starter i.m.o.

func StartPuller(downloader Downloader, commands <-chan ModelOp) {
puller := Puller{
channelMap: make(map[string]ModelChannel),
completions: make(chan string, 128),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 128, is there a reason for this buffer-size? Also, can we move that to a constant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of arbitrary, I just wanted sending to never block. It could be made much smaller now that there's a dedicated goroutine for managing the channel map.

}
}
} else {
//TODO log warning, shouldn't happen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this, any reason to leave as TODO?

}

func (p *Puller) modelProcessor(modelName string, events chan EventWrapper) {
func (p *Puller) modelProcessor(modelName string, ops <-chan *v1.ModelSpec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea to make the channel able to receive and send events is so that I can handle retries (by writing back to the same channel), is there a strategy to handle retries? I guess writing back to same channel is a bad design, maybe?

if err := p.RemoveModel(event.ModelName); err != nil {
log.Println("worker failed on", event, "because: ", err)
log.Println("Should unload", modelName)
if err := storage.RemoveDir(filepath.Join(p.Downloader.ModelDir, modelName)); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not keep this logic in a method that will probably be expanded upon, or is the extent of the storage deleting a model, going to be deleting the dir

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably yes, maybe even as a function on the Downloader for symmetry. I can't envisage what else would be done for this apart from deleting the model's dir, but it will probably be important to ensure that the "success" file is deleted first before the rest of it.

Follow-on changes to kserve#989 based on remaining review suggestions.

- Simplified configmap change diffing
- Connect watcher and puller with event channel
- Have puller track in-progress ops per model via op completion channel and tie lifecycle of per-model channel+goroutine pairs to this
The complete ModelOp struct is now passed all the way back and forth.
@yuzisun
Copy link
Member

yuzisun commented Sep 3, 2020

/retest

1 similar comment
@yuzisun
Copy link
Member

yuzisun commented Sep 10, 2020

/retest

@k8s-ci-robot
Copy link
Contributor

@njhill: The following test failed, say /retest to rerun all failed tests:

Test name Commit Details Rerun command
kubeflow-kfserving-presubmit 2091fed link /test kubeflow-kfserving-presubmit

Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here.

if ok {
p.enqueueModelOp(&modelOp)
} else {
commands = nil
Copy link
Member

@yuzisun yuzisun Sep 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can save the closed channel as bool variable here ? Setting channel as nil is not a good practice, as sending event to a nil channel blocks forever while sending event to a closed channel results in panic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuzisun the use here was deliberate. I had the behaviour of nil channels in mind, in particular as a way to disable the corresponding branch of the select after the channel is closed by the sender. See for example https://medium.com/justforfunc/why-are-there-nil-channels-in-go-9877cc0b2308.

The link you gave describes the properties of nil channels but I don't see anywhere that says it's bad practice to use them, I assume they were designed that way for a reason.

Happy to change it if you would still like me to or alternatively I could add a comment to make things a bit more explicit?

@yuzisun
Copy link
Member

yuzisun commented Sep 11, 2020

@njhill I made a comment on setting the channel to nil see https://dave.cheney.net/2014/03/19/channel-axioms, I think we can merge this after that is addressed.

@yuzisun yuzisun merged commit 773b10f into kserve:master Sep 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants