-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
remove stream if AutoStream enabled when no more subscribers #109
remove stream if AutoStream enabled when no more subscribers #109
Conversation
You should handle this in your own code. I don't think this is the job of the library. |
Hmm. Actually, I'm not sure; maybe removing streams if autostream is enabled should be a job of the library. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @frederikhors, thanks for submitting this PR.
I would request a few changes however. If you need any more guidance to make them, please let me know
http_test.go
Outdated
|
||
events := make(chan *Event) | ||
|
||
var cErr error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may be better to create a channel for this error, like:
cErr := make(chan error)
go func() {
cErr <- c.SubscribeChan("test", events)
}()
require.Nil(t, <-cErr)
This is thread safe and avoids having to sleep
You should probably use something like the wait()
function though to stop the test from blocking forever however.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, but it doesn't work. Can you help me here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this resolved, @purehyperbole?
|
That is most likely related to the fact that |
Can we somehow make it synchronous with some channels? |
yes, I think adding You can add: This to removed: make(chan struct{}, 1), This to str.subscribers[i].removed <- struct{}{}
close(str.subscribers[i].removed) and then in <-s.removed |
Added. The test is green now. But I have a doubt. We have three channels now: quit chan *Subscriber
connection chan *Event
removed chan struct{} Can't we reuse one of them ( |
I think in this case, hijacking the other two channels is going to complicate things. |
I haven't reviewed the changes, but I'm also managing my own channels and also using multiple channels per connection... will this conflict? |
(I'm not using Autostream) |
I think not. But What do you mean? Can you post an example? |
It's very complex... that's what concerns me. If these additional channels are created for each additional connection (even if Autostream is not enabled), then I will face a combinatorial explosion. |
Get this branch and test it. But this code is only for this package. maybe the problem is in your own code? |
This is true.
Frederik, are you running with data race detector on (https://golang.org/doc/articles/race_detector)? Just run them as a matter of course in this kind of work. Also, the race detector will only kick in if there's actually a data race; in other words, you should exercise with some load. |
lol I don't have a problem in my code. It's battle tested. I'm asking if this patch will be creating new Golang channels even if Autostream is disabled. |
I think yes. At least one channel because of the new detection. There is no alternative. Read: #109 (comment) |
That's what I thought. :( I think this overcomplicates the library then, and would be better a part of a wrapper library (or a separate server package) that manages your connections for you. |
I don't think so. Effective detection is part of this library. |
I respectfully disagree. I chose this library over the alternatives both because I didn't find any data races in it and because it is a lower-level library to build servers with, not a server in itself that takes responsibility for managing all the streams. |
@frederikhors thanks for updating the PR, i will review the changes again. @codewinch If it alleviates some of your concerns, the overhead from this in terms of memory is around extra 100 bytes. If you have 100k subscribers, that's only an extra 10MB of memory used. Definitely not free, but far from terrible. You can verify this as follows: package main
import (
"fmt"
"runtime"
)
func usage() {
runtime.GC()
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
}
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}
func main() {
usage()
x := make([]chan struct{}, 100000)
usage()
for i := 0; i < 100000; i++ {
x[i] = make(chan struct{}, 1)
}
usage()
} As the channels are mostly dormant before only being used once, there is a negligible amount of scheduler overhead to handling it this way. If you forsee of any specific concerns then please let me know so we can try to address them. Also, it's worth pointing out that why the library is race free, the tests as implemented are not (removeSubscriber causes a race). I disabled the race detector from the CI tests many years ago for that reason. It's probably worth expending the effort to correct those tests, so i'll make that a priority to do soon. |
Hi Tom, Thanks! This does help alleviate some concerns. I'm not concerned about RAM too much, especially since we have lots of RAM and it'll probably be reclaimed if inactive; I'm only really concerned about CPU, especially for millions of channels (we backend with another Go-based MQ system.) I guess we could always internally re-benchmark to see if these changes will cause any significant changes, but it'd be great if these management layers were separated into a separate higher-level library or server (the way socket servers in Node wrap around lower-level socket.io libraries, to enable separation of concerns and reduction of surface area for races and other bugs.) |
@codewinch Thanks for providing a bit more info about your usecase. Your concerns are definitely valid, as there will be CPU overhead (from my understanding only from GC having to scan the heap, but benchmarking is probably going to be the only way to know for sure) from those unused channels, provided you're dealing with a big enough number of subscribers. Channels that aren't being read from/written to should have no more overhead than any other value. We should be able to gate this functionality behind a conditional, so if you're not using Generally, I agree that streams should be completely separate from the core SSE functionality and if I were to start from scratch, this library would handle only the core SSE protocol (I actually split this out a long time ago with broadcast). With that said, it seems that at least a decent number of users rely in some way on the stream functionality, so it's not something we can remove from this library without disrupting some of our users (which is something I'd generally like to avoid if necessary). |
@purehyperbole maybe we can work on a new breaking version for this. |
Excellent! Yes, I agree completely with this approach. If the user is not using autostream, they don't get the extra functionality and have to implement it all themselves, while if they are, then they get to take advantage of these features with the associated tradeoff. Seems like a great way to go and I'm totally on board with it! |
"A library to handle broadcasting messages to a number of user channels" Oh, that is excellent -- don't know how I missed that. Thank you! Would love some docs on how to use it and how it interops with this lib. |
@frederikhors If it's not too much trouble, could you please kindly make the following changes to this PR? We should pass through the func newStream(bufsize int, replay, isAutoStream bool) *Stream {
return &Stream{
AutoReplay: replay,
subscribers: make([]*Subscriber, 0),
isAutoStream: isAutoStream,
register: make(chan *Subscriber),
deregister: make(chan *Subscriber),
event: make(chan *Event, bufsize),
quit: make(chan bool),
Eventlog: make(EventLog, 0),
}
} For // addSubscriber will create a new subscriber on a stream
func (str *Stream) addSubscriber(eventid int) *Subscriber {
atomic.AddInt32(&str.subscriberCount, 1)
sub := &Subscriber{
eventid: eventid,
quit: str.deregister,
connection: make(chan *Event, 64),
}
if str.isAutoStream {
sub.removed = make(chan struct{}, 1)
}
str.register <- sub
return sub
}
if str.subscribers[i].removed != nil {
str.subscribers[i].removed <- struct{}{}
} The subscriber closed method can be implemented as if s.removed != nil {
<-s.removed
} Understandably, if you don't have time to make these changes, I can merge this and implement those changes before releasing. @codewinch are you happy with the above changes? |
Excellent! I believe that resolves my concerns completely. Thanks to both of you! |
@purehyperbole done! |
@frederikhors Thanks for your hard work on this PR, looks good to merge! @codewinch Thank you for your feedback and guidance! I'll merge this now and create a new release 🎉 |
Thanks to both of you! |
@purehyperbole, @codewinch I think this is needed to cleanup a bit especially if there are many streams for short periods, something like
/events?stream=<eventIDEachTimeDifferent>
.But the new test sometimes is green sometimes not. What do you think?
It always turns green if I add this line:
after the line:
in
http.go
.But I don't like
time.Sleep()
in code.Can we better "wait" for
sub.close()
to finish?