-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes to subscriber pattern #828
Changes to subscriber pattern #828
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to mark this as request changes because this is only a first pass review. I need to look at the engine a little closer, I'm not understanding something that I want to look at a bit closer. Overall it's a nice job (no pun intended).
@@ -62,8 +62,6 @@ var ( | |||
) | |||
|
|||
const ( | |||
// MsgBufferSize - The buffer for the message channel. | |||
MsgBufferSize = 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're not using a buffered channel anymore? I guess if each one has its own channel it makes sense to be a blocking one item channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes also we are reading from that channel and handing of to the subscribers async, so we are always ready to read the next message. If we wanted to slow throughput we could very easily use a sync.WaitGroup
in combination with a buffered channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any reason to artificially slow the processing of msgs.
@@ -213,43 +211,43 @@ func CreateApp() App { | |||
validateRegistryNames(app.registry) | |||
|
|||
log.Debug("Initializing WorkEngine") | |||
app.engine = broker.NewWorkEngine(MsgBufferSize) | |||
stateSubscriber := broker.NewJobStateSubscriber(app.dao) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
pkg/broker/broker.go
Outdated
@@ -1059,13 +1059,6 @@ func (a AnsibleBroker) Unbind( | |||
} else { | |||
log.Warning("Broker configured to *NOT* launch and run APB unbind") | |||
} | |||
|
|||
err = cleanupUnbind(&bindInstance, &serviceInstance, bindExtCreds, a.dao) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this gone? It was part of the unbind_subscriber before, what is doing this now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it was moved to cleanupAfterUnbind
in the job_state_subscriber
.
pkg/broker/types.go
Outdated
@@ -304,4 +304,12 @@ type SubscriberDAO interface { | |||
SetState(id string, state apb.JobState) (string, error) | |||
GetServiceInstance(id string) (*apb.ServiceInstance, error) | |||
DeleteServiceInstance(id string) error | |||
GetBindInstance(id string) (*apb.BindInstance, error) | |||
DeleteBindInstance(id string) error | |||
SetServiceInstance(id string, serviceInstance *apb.ServiceInstance) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of the SubscriberDAO. It feels weird to me. But that's for a different day.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is adding more and more methods, why don't we just make DAO now that DAO is an interface itself?
We either have or easily can create a mock for it as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep no issue with changing that as we already have the DAO interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shawn-hurley How do you feel about me creating a follow on issue to remove the subscriber dao and create the mocks etc? I don't see an existing mock at the moment and quite a few tests would need updating. I would like to get this landed and base the apb state work from master with these changes already available?
pkg/broker/types.go
Outdated
SetServiceInstance(id string, serviceInstance *apb.ServiceInstance) error | ||
} | ||
|
||
// WorkSubscriber - Interface tha wraps the Subscribe method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: tha -> that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also it says that wraps the Subscribe
method, is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope comment needs changing 👍
pkg/broker/work_engine.go
Outdated
func NewWorkEngine(bufferSize int) *WorkEngine { | ||
return &WorkEngine{topics: make(map[WorkTopic]chan JobMsg), bufsz: bufferSize} | ||
func NewWorkEngine() *WorkEngine { | ||
return &WorkEngine{jobs: make(map[string]chan JobMsg), subscribers: map[WorkTopic][]WorkSubscriber{}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, why is the channel a non buffered channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As currently we are handing off the messages immediately and asynchronously we should always be able to read from and send to the channel so in this case there is no benefit to a buffered channel.
Where we may benefit from a buffered channel is if we used it in conjunction with a sync.WaitGroup
Something like:
wg := sync.WaitGroup{}
for msg := range engine.jobs[token] {
wg.Add(len(engine.subscribers))
for _, sub := range engine.subscribers[topic] {
go sub.Notify(msg, wg) // All subscribers receive msg at same time, and each subscriber will call Done() when it is finished
}
wg.Wait() // wait for all subscribers to be done
}
This design would allow each of the subscribers to receive the messages at the same time, but force us to wait until they are all complete before accepting the next message. In this case having a buffer would be valuable as it would allow the Job to continue to send messages even if our subscribers were running a little slow.
The question really is if there is value in waiting for all the subscribers to finish before accepting the next message.
I am leaning towards yes on this due to the fact that state will be changed and modified and there is no guarantee on that the order a set of go routines were created will also be the order they are executed in.
pkg/broker/work_engine.go
Outdated
@@ -50,10 +50,33 @@ func (engine *WorkEngine) StartNewAsyncJob( | |||
if token == "" { | |||
token = engine.Token() | |||
} | |||
go work.Run(token, engine.topic(topic)) | |||
go engine.start(token, work, topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels really weird to me. Starting the engine for each job. Feels like the engine should always be running and waiting for new jobs to be added to it to be run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be called startSubscriberListeningAndRun(). I would prefer to keep in smaller maybe something like startJob
?
pkg/mock/dao.go
Outdated
@@ -86,6 +86,49 @@ func (mp *SubscriberDAO) GetServiceInstance(id string) (*apb.ServiceInstance, er | |||
return retOb.(*apb.ServiceInstance), mp.Errs["GetServiceInstance"] | |||
} | |||
|
|||
//DeleteBindInstance mock impl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spaced after the //
pkg/broker/types.go
Outdated
@@ -304,4 +304,12 @@ type SubscriberDAO interface { | |||
SetState(id string, state apb.JobState) (string, error) | |||
GetServiceInstance(id string) (*apb.ServiceInstance, error) | |||
DeleteServiceInstance(id string) error | |||
GetBindInstance(id string) (*apb.BindInstance, error) | |||
DeleteBindInstance(id string) error | |||
SetServiceInstance(id string, serviceInstance *apb.ServiceInstance) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is adding more and more methods, why don't we just make DAO now that DAO is an interface itself?
We either have or easily can create a mock for it as well.
pkg/broker/work_engine.go
Outdated
for msg := range engine.jobs[token] { | ||
for _, sub := range engine.subscribers[topic] { | ||
//TODO edge case consider the fact that a subscriber may never exit and so we would leak go routines | ||
go sub.Notify(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In regards to the comment, I do think that having a context timeout makes sense here.
I think a simple implementation would be to wrap this in a func() that waited for a configurable amount of time.
The notify interface could then have an error return that we would log if something does go wrong. and if it returns with nil then everythng is all good.
...
for _, sub := range engine.subscribers[topic] {
c := make(chan error, 1)
go waitForNotify(engine.duration, sub, msg)
....
func waitForNotify(d time.Duration, w WorkSubscriber, j JobMessage) {
select {
case err := w.Notify(j):
handle error or if nil return
case <-d:
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to chew this one over 🤔 I am not sure you can do the function call this way in the select statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your right, I think you need to pass a channel to waitForNotify
function and add the timeout bits to a function that gets launched with the go routine above.
Something like this?
https://play.golang.org/p/pRjyCXfZgSi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to take into account your feedback plus look at adding in the wait group and using context https://gist.github.com/maleck13/d2c903eac6064422966d64715118345e
pkg/broker/work_engine.go
Outdated
@@ -50,10 +50,33 @@ func (engine *WorkEngine) StartNewAsyncJob( | |||
if token == "" { | |||
token = engine.Token() | |||
} | |||
go work.Run(token, engine.topic(topic)) | |||
go engine.start(token, work, topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be called startSubscriberListeningAndRun(). I would prefer to keep in smaller maybe something like startJob
?
I think this is looking really good, does a good job of reducing complexity! |
f7be0e0
to
308ef44
Compare
Ok so I have updated based on feedback and discussions. It is still WIP, but again wanted to give opportunity for feedback. Main changes
The key concern for me here was to avoid a situation where writes could be executed in the wrong order. ping @shawn-hurley @jmrodri @eriknelson Will be adding a number of unit tests here and also looking at a metrics subscriber in the next day or so. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comment changes, variable name change (or question). Overall I like this updated version better.
// defaultClusterURLPreFix - prefix for the ansible service broker. | ||
defaultClusterURLPreFix = "/ansible-service-broker" | ||
// MsgBufferSize - The buffer for the message channel. | ||
MsgBufferSize = 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 this makes me feel better :)
pkg/app/app.go
Outdated
@@ -213,43 +213,43 @@ func CreateApp() App { | |||
validateRegistryNames(app.registry) | |||
|
|||
log.Debug("Initializing WorkEngine") | |||
stateSubscriber := broker.NewJobStateSubscriber(app.dao) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 to the single stateSubscriber.
log.Debugf("JobStateSubscriber Notify : msg state %v ", msg.State) | ||
id := msg.InstanceUUID | ||
if isBinding(msg) { | ||
id = msg.BindingUUID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can msg.BindingUUID
ever be ""
? I would probably change isBinding
to also ensure BindingUUID
is not empty. Otherwise, we would store an empty string as the key. Maybe there is no way this could happen, I'll see how the flow goes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems if it can ever be empty that it should fail before getting here, similar with instanceUUID.
pkg/broker/work_engine.go
Outdated
topics map[WorkTopic]chan JobMsg | ||
bufsz int | ||
subscribers map[WorkTopic][]WorkSubscriber | ||
jobs map[string]chan JobMsg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jobs is weird because these aren't the jobs. They're the channels used by the jobs for messages. The name implies the job structs are in this map but that is not the case. Calling jobs["token"]
will not return me the job I created. Consider renaming it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to jobChannels
@@ -50,10 +54,65 @@ func (engine *WorkEngine) StartNewAsyncJob( | |||
if token == "" { | |||
token = engine.Token() | |||
} | |||
go work.Run(token, engine.topic(topic)) | |||
go engine.startJob(token, work, topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startJob
is better and more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I'm not sure it should be a private fn though? I suppose it's fine since we're really the only ones using it but I thought it was useful to design it from a perspective of "this is a public lib". No requested changes, but spitballing.
pkg/broker/work_engine.go
Outdated
|
||
go func() { | ||
// listen for a new message for the job keyed to this token and hand off to the subscribers async. Wait for them all to be done before accepting | ||
// the next message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reformat this comment to be a couple shorter lines. The first one is really long.
pkg/broker/work_engine.go
Outdated
}() | ||
// notify the subscriber | ||
go waitForNotify(sub, msg, notifySignal) | ||
//act on whichever happens first the subscriber's notify method completing or the timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space after //
pkg/broker/work_engine.go
Outdated
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) //TODO make configurable | ||
// used to tell us when the subscribers notify method is completed | ||
notifySignal := make(chan struct{}) | ||
//If our subscriber times out or returns normally we will always clean up |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space after //
@@ -66,7 +125,7 @@ func (engine *WorkEngine) StartNewSyncJob( | |||
token = engine.Token() | |||
} | |||
|
|||
work.Run(token, engine.topic(topic)) | |||
engine.startJob(token, work, topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
pkg/broker/work_engine.go
Outdated
return engine.topics | ||
// GetActiveJobs - Get list of active jobs | ||
func (engine *WorkEngine) GetActiveJobs() map[string]chan JobMsg { | ||
return engine.jobs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this really active jobs? I would expect it to return an array of jobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to GetActiveJobChannels
308ef44
to
3213f38
Compare
3213f38
to
2f8adeb
Compare
@jmrodri I added back the clean up of unbind state when launch_apb_on_bind is false. I think this will fix the CI failure. At lease it does locally when running make ci Maybe
|
bfcf48a
to
6211d29
Compare
added to DAO can remove if need be |
@shawn-hurley @jmrodri @eriknelson this is no longer WIP please review when ready. |
@@ -79,6 +79,9 @@ type Dao interface { | |||
// DeleteBindInstance - Delete the binding instance for an id in the kvp API. | |||
DeleteBindInstance(string) error | |||
|
|||
// DeleteBinding - Delete the binding instance and remove the assocation with the service instance. | |||
DeleteBinding(apb.BindInstance, apb.ServiceInstance) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@maleck13 If you could rebase this, the build should start passing |
…emoving concerns around race conditions
6211d29
to
13be109
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No requests for change; I love how much code this removed, it's a great improvement. Thanks @maleck13
case <-ctx.Done(): | ||
return | ||
default: | ||
signal <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this just a trigger? Usually I see bool
s used.
wg.Add(1) | ||
// ensure things don't get locked up. | ||
// Each subscriber has up to the configured amount of time to complete its action | ||
ctx, cancel := context.WithTimeout(context.Background(), engine.subscriberTimeout*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is so much better 👍
Describe what this PR does and why we need it:
Updates to how we handle the channels and subscribers based on the proposal outlined as part of
#638
@jmrodri @eriknelson initial WIP work here for early feedback. Not complete yet but also not miles away. Would like any feedback you have to ensure we are still on the same page and I am not heading down any rabbit holes 😄