Replies: 1 comment
-
problem with const func (c Connector) broadcast(ctx context.Context, conn Connection) error {
buf := make(chan Msg, 100)
defer close(buf)
go func() {
for msg := range buf {
c.distribute(ctx, msg, conn.Meta, conn.Receivers)
}
}()
receiverSet := make(map[PortAddr]struct{}, len(conn.Meta.ReceiverPortAddrs))
for _, receiverPortAddr := range conn.Meta.ReceiverPortAddrs {
receiverSet[receiverPortAddr] = struct{}{}
}
for {
select {
case <-ctx.Done():
return nil
case msg := <-conn.Sender:
msg = c.listener.Send(Event{
Type: MessageSentEvent,
MessageSent: &EventMessageSent{
SenderPortAddr: conn.Meta.SenderPortAddr,
ReceiverPortAddrs: receiverSet,
},
}, msg)
// instead of distributing msg directly, we use buffer, so sender can write faster than receivers read
buf <- msg
}
}
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Current implementation of the connector algorithm has some downsides that leads to unnecessary blocking. Here's how it works
For every N connections, where connection is a one sender to many receivers relation, spawn N goroutines. Each goroutine reads an message from sender and distributes across M receivers. It does so in a cyclic manner, trying to avoid unnecessary blocking - if one receiver is busy it immediately goes to the next one and do so in a loop until every receiver got the message. However, if sender is fast enough to deliver a new message before all receivers got the previous one, then we have to wait. In other words - the speed of distribution of a message across M receivers is the speed of the slowest receiver.
It's possible in theory for "fast" receivers not to wait for the slower and continue to receive messages. Here's the example:
Let
s1
sender sends message at the speed of1 msg/second
and let there be 2 receiversr1
andr2
-r1
receiver is as fast ass1
sender and is able to receive1 msg/second
butr2
receiver is slow and only can receive0.5 msg/second
(it needs 2 seconds to process a message).s1
sends am1
first message tor1
andr2
s1
sends secondm2
messager1
finishes processing ofm1
and could immediately receivem1
but instead waits forr2
that needs one more seconds1
sends third messagem3
r2
finishes processing ofm1
r1
andr2
immediately receivesm2
message but there's alreadym3
waitingHowever, keep in mind that order of messages must not be broken. Any optimization of the algorithm must keep the correctness of the current implementation.
Related to #250, #90, #243
Beta Was this translation helpful? Give feedback.
All reactions