Skip to content

Commit

Permalink
Support bandwidth limits when selecting proxy to use.
Browse files Browse the repository at this point in the history
  • Loading branch information
fancycode committed Apr 25, 2024
1 parent 0d3d002 commit f58d985
Show file tree
Hide file tree
Showing 9 changed files with 675 additions and 70 deletions.
29 changes: 29 additions & 0 deletions api_proxy.go
Expand Up @@ -299,12 +299,41 @@ type PayloadProxyServerMessage struct {

// Type "event"

type EventProxyServerBandwidth struct {
// Incoming is the bandwidth utilization for publishers in percent.
Incoming *float64 `json:"incoming,omitempty"`
// Outgoing is the bandwidth utilization for subscribers in percent.
Outgoing *float64 `json:"outgoing,omitempty"`
}

func (b *EventProxyServerBandwidth) String() string {
if b.Incoming != nil && b.Outgoing != nil {
return fmt.Sprintf("bandwidth: incoming=%.3f%%, outgoing=%.3f%%", *b.Incoming, *b.Outgoing)
} else if b.Incoming != nil {
return fmt.Sprintf("bandwidth: incoming=%.3f%%, outgoing=unlimited", *b.Incoming)
} else if b.Outgoing != nil {
return fmt.Sprintf("bandwidth: incoming=unlimited, outgoing=%.3f%%", *b.Outgoing)
} else {
return "bandwidth: incoming=unlimited, outgoing=unlimited"
}
}

func (b EventProxyServerBandwidth) AllowIncoming() bool {
return b.Incoming == nil || *b.Incoming < 100
}

func (b EventProxyServerBandwidth) AllowOutgoing() bool {
return b.Outgoing == nil || *b.Outgoing < 100
}

type EventProxyServerMessage struct {
Type string `json:"type"`

ClientId string `json:"clientId,omitempty"`
Load int64 `json:"load,omitempty"`
Sid string `json:"sid,omitempty"`

Bandwidth *EventProxyServerBandwidth `json:"bandwidth,omitempty"`
}

// Information on a proxy in the etcd cluster.
Expand Down
2 changes: 2 additions & 0 deletions docker/README.md
Expand Up @@ -102,6 +102,8 @@ The running container can be configured through different environment variables:
- `EXTERNAL_HOSTNAME`: The external hostname for remote streams. Will try to autodetect if omitted.
- `TOKEN_ID`: Id of the token to use when connecting remote streams.
- `TOKEN_KEY`: Private key for the configured token id.
- `BANDWIDTH_INCOMING`: Optional incoming target bandwidth (in megabits per second).
- `BANDWIDTH_OUTGOING`: Optional outgoing target bandwidth (in megabits per second).
- `JANUS_URL`: Url to Janus server.
- `MAX_STREAM_BITRATE`: Optional maximum bitrate for audio/video streams.
- `MAX_SCREEN_BITRATE`: Optional maximum bitrate for screensharing streams.
Expand Down
5 changes: 5 additions & 0 deletions docker/proxy/entrypoint.sh
Expand Up @@ -52,6 +52,11 @@ if [ ! -f "$CONFIG" ]; then
fi
if [ -n "$TOKEN_KEY" ]; then
sed -i "s|#token_key =.*|token_key = $TOKEN_KEY|" "$CONFIG"
if [ -n "$BANDWIDTH_INCOMING" ]; then
sed -i "s|#incoming =.*|incoming = $BANDWIDTH_INCOMING|" "$CONFIG"
fi
if [ -n "$BANDWIDTH_OUTGOING" ]; then
sed -i "s|#outgoing =.*|outgoing = $BANDWIDTH_OUTGOING|" "$CONFIG"
fi

HAS_ETCD=
Expand Down
173 changes: 144 additions & 29 deletions mcu_proxy.go
Expand Up @@ -334,6 +334,7 @@ type mcuProxyConnection struct {
ip net.IP

load atomic.Int64
bandwidth atomic.Pointer[EventProxyServerBandwidth]
mu sync.Mutex
closer *Closer
closedDone *Closer
Expand Down Expand Up @@ -385,6 +386,7 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string, ip net.IP) (*mcuProx
}
conn.reconnectInterval.Store(int64(initialReconnectInterval))
conn.load.Store(loadNotConnected)
conn.bandwidth.Store(nil)
conn.country.Store("")
return conn, nil
}
Expand Down Expand Up @@ -488,6 +490,10 @@ func (c *mcuProxyConnection) Load() int64 {
return c.load.Load()
}

func (c *mcuProxyConnection) Bandwidth() *EventProxyServerBandwidth {
return c.bandwidth.Load()
}

func (c *mcuProxyConnection) Country() string {
return c.country.Load().(string)
}
Expand Down Expand Up @@ -532,7 +538,10 @@ func (c *mcuProxyConnection) readPump() {
}
}()
defer c.close()
defer c.load.Store(loadNotConnected)
defer func() {
c.load.Store(loadNotConnected)
c.bandwidth.Store(nil)
}()

c.mu.Lock()
conn := c.conn
Expand Down Expand Up @@ -996,9 +1005,10 @@ func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) {
return
case "update-load":
if proxyDebugMessages {
log.Printf("Load of %s now at %d", c, event.Load)
log.Printf("Load of %s now at %d (%s)", c, event.Load, event.Bandwidth)
}
c.load.Store(event.Load)
c.bandwidth.Store(event.Bandwidth)
statsProxyBackendLoadCurrent.WithLabelValues(c.url.String()).Set(float64(event.Load))
return
case "shutdown-scheduled":
Expand Down Expand Up @@ -1730,27 +1740,27 @@ func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) {
delete(m.publishers, getStreamId(publisher.id, publisher.StreamType()))
}

func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
connections := m.getSortedConnections(initiator)
func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuPublisher {
var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = m.maxScreenBitrate
} else {
maxBitrate = m.maxStreamBitrate
}
if bitrate <= 0 {
bitrate = maxBitrate
} else {
bitrate = min(bitrate, maxBitrate)
}

for _, conn := range connections {
if conn.IsShutdownScheduled() || conn.IsTemporary() {
if !isAllowed(conn) || conn.IsShutdownScheduled() || conn.IsTemporary() {
continue
}

subctx, cancel := context.WithTimeout(ctx, m.proxyTimeout)
defer cancel()

var maxBitrate int
if streamType == StreamTypeScreen {
maxBitrate = m.maxScreenBitrate
} else {
maxBitrate = m.maxStreamBitrate
}
if bitrate <= 0 {
bitrate = maxBitrate
} else {
bitrate = min(bitrate, maxBitrate)
}
publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, bitrate, mediaTypes)
if err != nil {
log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn, err)
Expand All @@ -1761,11 +1771,61 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st
m.publishers[getStreamId(id, streamType)] = conn
m.mu.Unlock()
m.publisherWaiters.Wakeup()
return publisher, nil
return publisher
}

statsProxyNobackendAvailableTotal.WithLabelValues(string(streamType)).Inc()
return nil, fmt.Errorf("No MCU connection available")
return nil
}

func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
connections := m.getSortedConnections(initiator)
publisher := m.createPublisher(ctx, listener, id, sid, streamType, bitrate, mediaTypes, initiator, connections, func(c *mcuProxyConnection) bool {
bw := c.Bandwidth()
return bw == nil || bw.AllowIncoming()
})
if publisher == nil {
// No proxy has available bandwidth, select one with the lowest currently used bandwidth.
connections2 := make([]*mcuProxyConnection, 0, len(connections))
for _, c := range connections {
if c.Bandwidth() != nil {
connections2 = append(connections2, c)
}
}
SlicesSortFunc(connections2, func(a *mcuProxyConnection, b *mcuProxyConnection) int {
var incoming_a *float64
if bw := a.Bandwidth(); bw != nil {
incoming_a = bw.Incoming
}

var incoming_b *float64
if bw := b.Bandwidth(); bw != nil {
incoming_b = bw.Incoming
}

if incoming_a == nil && incoming_b == nil {
return 0
} else if incoming_a == nil && incoming_b != nil {
return -1
} else if incoming_a != nil && incoming_b == nil {
return -1
} else if *incoming_a < *incoming_b {
return -1
} else if *incoming_a > *incoming_b {
return 1
}
return 0
})
publisher = m.createPublisher(ctx, listener, id, sid, streamType, bitrate, mediaTypes, initiator, connections2, func(c *mcuProxyConnection) bool {
return true
})
}

if publisher == nil {
statsProxyNobackendAvailableTotal.WithLabelValues(string(streamType)).Inc()
return nil, fmt.Errorf("No MCU connection available")
}

return publisher, nil
}

func (m *mcuProxy) getPublisherConnection(publisher string, streamType StreamType) *mcuProxyConnection {
Expand Down Expand Up @@ -1812,6 +1872,30 @@ type proxyPublisherInfo struct {
err error
}

func (m *mcuProxy) createSubscriber(ctx context.Context, listener McuListener, id string, publisher string, streamType StreamType, publisherConn *mcuProxyConnection, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuSubscriber {
for _, conn := range connections {
if !isAllowed(conn) || conn.IsShutdownScheduled() || conn.IsTemporary() {
continue
}

var subscriber McuSubscriber
var err error
if conn == publisherConn {
subscriber, err = conn.newSubscriber(ctx, listener, id, publisher, streamType)
} else {
subscriber, err = conn.newRemoteSubscriber(ctx, listener, id, publisher, streamType, publisherConn)
}
if err != nil {
log.Printf("Could not create subscriber for %s publisher %s on %s: %s", streamType, publisher, conn, err)
continue
}

return subscriber
}

return nil
}

func (m *mcuProxy) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType StreamType, initiator McuInitiator) (McuSubscriber, error) {
var publisherInfo *proxyPublisherInfo
if conn := m.getPublisherConnection(publisher, streamType); conn != nil {
Expand Down Expand Up @@ -1948,21 +2032,52 @@ func (m *mcuProxy) NewSubscriber(ctx context.Context, listener McuListener, publ
return nil, publisherInfo.err
}

if !publisherInfo.conn.IsSameCountry(initiator) {
bw := publisherInfo.conn.Bandwidth()
allowOutgoing := bw == nil || bw.AllowOutgoing()
if !allowOutgoing || !publisherInfo.conn.IsSameCountry(initiator) {
connections := m.getSortedConnections(initiator)
if len(connections) > 0 && !connections[0].IsSameCountry(publisherInfo.conn) {
if !allowOutgoing || len(connections) > 0 && !connections[0].IsSameCountry(publisherInfo.conn) {
// Connect to remote publisher through "closer" gateway.
for _, conn := range connections {
if conn.IsShutdownScheduled() || conn.IsTemporary() || conn == publisherInfo.conn {
continue
subscriber := m.createSubscriber(ctx, listener, publisherInfo.id, publisher, streamType, publisherInfo.conn, connections, func(c *mcuProxyConnection) bool {
bw := c.Bandwidth()
return bw == nil || bw.AllowOutgoing()
})
if subscriber == nil {
connections2 := make([]*mcuProxyConnection, 0, len(connections))
for _, c := range connections {
if c.Bandwidth() != nil {
connections2 = append(connections2, c)
}
}
SlicesSortFunc(connections2, func(a *mcuProxyConnection, b *mcuProxyConnection) int {
var outgoing_a *float64
if bw := a.Bandwidth(); bw != nil {
outgoing_a = bw.Outgoing
}

subscriber, err := conn.newRemoteSubscriber(ctx, listener, publisherInfo.id, publisher, streamType, publisherInfo.conn)
if err != nil {
log.Printf("Could not create subscriber for %s publisher %s on %s: %s", streamType, publisher, conn, err)
continue
}
var outgoing_b *float64
if bw := b.Bandwidth(); bw != nil {
outgoing_b = bw.Outgoing
}

if outgoing_a == nil && outgoing_b == nil {
return 0
} else if outgoing_a == nil && outgoing_b != nil {
return -1
} else if outgoing_a != nil && outgoing_b == nil {
return -1
} else if *outgoing_a < *outgoing_b {
return -1
} else if *outgoing_a > *outgoing_b {
return 1
}
return 0
})
subscriber = m.createSubscriber(ctx, listener, publisherInfo.id, publisher, streamType, publisherInfo.conn, connections2, func(c *mcuProxyConnection) bool {
return true
})
}
if subscriber != nil {
return subscriber, nil
}
}
Expand Down

0 comments on commit f58d985

Please sign in to comment.