-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Go in a more idiomatic way #126
Conversation
There was no point requesting the key frame by sending a message to the conference as the only thing that the conference did was calling a single function from the peer, so it was completely wasting the CPU time. Instead, we could directly call this function and write to the peer connection. The function is thread-safe since all mutable state is protected by a mutex and the peer connection has a mutex inside.
This solves the problem with managing the queue size on the receiver.
It's ok if the matrix SDK blocks until we're ready to accept new messages.
Use unbounded channels for that. We can't quite use it for the multi-conference set ups as a single conference will affect others, but currently we don't have these problems.
Usage of `common` is an anti-pattern.
Package `common` is an antipattern.
The `common` package is an antipattern.
This is a temporary measure until we can workaround bugs in Pion that lead to the deadlocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good, but this is a huge PR with a lot of unrelated changes, making it very hard to review. Moving things out of common, in particular, feels like it doesn't belong here.
Yeah, sorry, perhaps the I tried to separate changes by commits, so it's easier to review the PR by checking each individual commit as one commit did one single thing (I tried to rebase them before creating a PR so that it's easier to follow the logic and the changes). I.e. if you review it by commit, I think it should be more manageable as they are mostly small. |
cmd/sfu/main.go
Outdated
|
||
// Start matrix client sync. This function will block until the sync fails. | ||
matrixClient.RunSyncing(func(e *event.Event) { | ||
routerChannel <- e | ||
matrixEvents <- e | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
matrixClient.RunSyncing
running endless until a panic happen. Is that right?
We could avoid this if we change the signature like this:
matrixEvents, errc := s.matrixClient.RunSyncing()
routing.RunRouter(matrixClient, connectionFactory, matrixEvents, config.Conference)
if err := <-errc; err != nil {
log(err)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Do I get it right that the advantage is that we don't panic from inside of a package, but do it in main
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could decide that by what kind of error occurred. I think we should avoid panic if not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, makes sense, though we only have one type of error. But please confirm that my understanding is correct: the reason why we're doing this is that it would allow us to avoid panic inside a package? - If so, I agree, but couldn't we solve it easier by just returning this error instead of panicking? I.e. I can slightly change the RunSyncing()
so that it does not panic once syncing stopped, but instead just returns the error back to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RunSyncing()
acts as the main loop. errc
is a kind of done channel that could interrupt the loop gracefully. Additionally errc
contains the reason for the termination. With nil
it was an intentional abort and with error
an abort because an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. But we could simply return an error instead of a panic which would be the same effect, right?
Anyway, I pushed a couple of new changes, so that:
- We don't panic anymore from inside the signaling package, we return an error instead (to be consistent with the API of the Matrix SDK there; it seems like a separate
errc
is not required in that particular case, but could be added on top of it if we ever need it?). - I don't store the
done
inside a conference state anymore (as you pointed out, we only use it from theprocessMessages()
loop to close it once the loop is over, so I just pass it as a local variable. That way no other function can [by accident] close a channel that it's not supposed to close 👍
Ptal 🙂
return ErrSinkSealed | ||
case s.messageSink <- messageWithSender: | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do not know which of the two cases is executed at runtime. With 'default' you make sure that the case is checked first.
select {
case <-s.sealed:
return ErrSinkSealed
default:
s.messageSink <- messageWithSender:
return nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But is it not different semantically? I.e. what if the sender gets to the s.messageSink <- messageWithSender
part and blocks there waiting for the reader to get ready? We must ensure that once the reader is not ready to accept new messages (e.g. not interested in them anymore), then we don't continue waiting on the channel. Essentially that's what I tried to describe, i.e. it's semantically I wanted this
select {
case <-done:
return OkRecipientDoesNotExpectNewMessages
case s.messageSink <- messageWithSender:
return nil
}
Though your concern is valid, I tried to catch this case by checking the atomic variable at the beginning of the function (this does not guarantee that those who called Send()
just before Close()
won't send a value though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok got it better send a message more than blocking.
pkg/conference/start.go
Outdated
} | ||
|
||
participantID := participant.ID{UserID: userID, DeviceID: inviteEvent.DeviceID, CallID: inviteEvent.CallID} | ||
if err := conference.onNewParticipant(participantID, inviteEvent); err != nil { | ||
return nil, err | ||
return nil, nil | ||
} | ||
|
||
// Start conference "main loop". | ||
go conference.processMessages() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe if processMessages has its own done channel that it closes, then we can wait here for the processMessages done and clean up if necessary.
go func() {
defer close(done) // close conference after processMessages is done
pmDone := conference.processMessages()
<- pmDone
// ... do some clean ups, currently not needed
}()
I thought only because the conference done
is distributed in the sinks.
And it would have the advantage that we don't have to bind done
it to the conference structure.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your point about storing done
inside the conference is valid. This could be solved if I pass done
to the processMessages
(so that we won't need to store it in the conference and rename it to signalDone
instead of done
so that it does not give a notion as if it's something that we need to listen to).
The rest I think is identical (we start go-routine for processMessages that then cleans up things), though we don't need to wait on done
inside StartConference()
, since that done
channel is returned for the upstream jobs, so that they know when the conference is closed.
I.e. we return the done
, so that the caller which sends messages to the conference knows when to stop producing new values (when to stop sending values to the conference).
That way when the sender sends a matrix message to the conference, the sender's code looks like this:
select {
case conferenceSink <- message:
// message sent to the conference, cool
case <-conferenceDone:
// conference closed, ok, so we don't send messages to it anymore
close(conferenceSink)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. At this time we are not running in a panic because the conference message channel will never be closed. With the clean up we could first close the sinks and then close the message channel from conference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this time we are not running in a panic because the conference message channel will never be closed.
Sorry, I did not quite get it. The channel between the router and a conference gets closed by the sender (router) once the sender spots that the Conference
closed the done
channel. The done
channel is closed by the conference when the conference ends to inform the upstream stage (router) that no more new messages are expected. When the router spots that, the router closes the message channel that it used to send messages to the conference and removes the conference from the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant this select construct select to send to conference
could lead to a panic when this conference.peerMessages channel would be closed. Because every Sink writes to this channel.
Here in NewSink the channel conference.peerMessages
is handed over. I couldn't find a place where we close conference.peerMessages
.
Should we decide to close conference.peerMessages
then here would be. a good place, after all sinks and processMessages have been closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see, you mean a different place. Yes, this is a single leftover that is still not refactored (I mentioned it in the description), i.e. ideally that part should be rewritten as a fan-in thing where we have a single sender for each peer and then a merge
function. Though we would need to use Select
from the reflect
since the number of peers changes over time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
So it better reflects the semantics (that the function does not block).
This is a result of working on #120 which did not quite work as expected due to some bugs and issues with Pion.
So currently it's more a preparation to properly implement a #120 and to get ready for #117
This PR changes the following things:
common
package as it seems to be an antipattern that is used to overcome Go limitations and which ends up having everything that does not belong to other packages.Note that there is one significant change in the behavior: currently, the router process the messages from the Matrix SDk and sends them to the conference via an unbounded channel. This means that if one conference freezes, it will affect all other conferences! It's not a problem for now since we only primarily use a single conference and the conferences should never freeze (so it helps to check if our implementation is sound), but ideally we probably need to get the buffer back for the conference, otherwise we won't be able to make them all independent.