Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer: add data channel worker #136

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions pkg/peer/datachannel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package peer

import (
"time"

"github.com/matrix-org/waterfall/pkg/peer/state"
"github.com/matrix-org/waterfall/pkg/worker"
"github.com/sirupsen/logrus"
)

func newDataChannelWorker(state *state.PeerState, logger *logrus.Entry) *worker.Worker[string] {
// Create the configuration for the data channel worker.
workerConfig := worker.Config[string]{
ChannelSize: 32,
Timeout: time.Hour,
OnTimeout: func() {},
OnTask: func(json string) {
ch := state.GetDataChannel()
if ch == nil {
logger.Warn("dropping the message, channel not available")
return
}

if err := ch.SendText(json); err != nil {
logger.WithError(err).Error("failed to send data channel message")
return
}
},
}

// Create the worker.
return worker.StartWorker(workerConfig)
}
51 changes: 29 additions & 22 deletions pkg/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package peer

import (
"errors"
"fmt"

"github.com/matrix-org/waterfall/pkg/channel"
"github.com/matrix-org/waterfall/pkg/peer/state"
"github.com/matrix-org/waterfall/pkg/webrtc_ext"
"github.com/matrix-org/waterfall/pkg/worker"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus"
Expand All @@ -18,7 +18,6 @@ var (
ErrCantCreateAnswer = errors.New("can't create answer")
ErrCantSetLocalDescription = errors.New("can't set local description")
ErrCantCreateLocalDescription = errors.New("can't create local description")
ErrDataChannelNotAvailable = errors.New("data channel is not available")
ErrDataChannelNotReady = errors.New("data channel is not ready")
ErrCantSubscribeToTrack = errors.New("can't subscribe to track")
)
Expand All @@ -28,10 +27,11 @@ var (
// and informs the outside world about the things happening inside the peer by posting
// the messages to the channel.
type Peer[ID comparable] struct {
logger *logrus.Entry
peerConnection *webrtc.PeerConnection
sink *channel.SinkWithSender[ID, MessageContent]
state *state.PeerState
logger *logrus.Entry
peerConnection *webrtc.PeerConnection
sink *channel.SinkWithSender[ID, MessageContent]
state *state.PeerState
dataChannelWorker *worker.Worker[string]
}

// Instantiates a new peer with a given SDP offer and returns a peer and the SDP answer if everything is ok.
Expand All @@ -47,11 +47,18 @@ func NewPeer[ID comparable](
return nil, nil, ErrCantCreatePeerConnection
}

// The thread-safe peer state.
peerState := state.NewPeerState()

// The worker that is responsible for writing data channel messages.
dataChannelWorker := newDataChannelWorker(peerState, logger)

peer := &Peer[ID]{
logger: logger,
peerConnection: peerConnection,
sink: sink,
state: state.NewPeerState(),
logger: logger,
peerConnection: peerConnection,
sink: sink,
state: peerState,
dataChannelWorker: dataChannelWorker,
}

peerConnection.OnTrack(peer.onRtpTrackReceived)
Expand Down Expand Up @@ -80,6 +87,9 @@ func (p *Peer[ID]) Terminate() {
// We may want to remove this logic if/once we want to receive messages (confirmation of close or whatever)
// from the peer that is considered closed.
p.sink.Seal()

// Stop the worker for the data channel messages.
p.dataChannelWorker.Stop()
}

// Request a key frame from the peer connection.
Expand All @@ -99,21 +109,18 @@ func (p *Peer[ID]) RemoveTrack(sender *webrtc.RTPSender) error {
}

// Tries to send the given message to the remote counterpart of our peer.
// The error returned from this function means that the message has not been sent.
// Note that if no error is returned, it doesn't mean that the message has been
// successfully sent. It only means that the message has been "scheduled" (enqueued).
func (p *Peer[ID]) SendOverDataChannel(json string) error {
dataChannel := p.state.GetDataChannel()
if dataChannel == nil {
return ErrDataChannelNotAvailable
}

if dataChannel.ReadyState() != webrtc.DataChannelStateOpen {
return ErrDataChannelNotReady
}

if err := dataChannel.SendText(json); err != nil {
return fmt.Errorf("failed to send data over data channel: %w", err)
// Preliminary quick check, so that we can fail early if the channel is closed.
if ch := p.state.GetDataChannel(); ch != nil && ch.ReadyState() == webrtc.DataChannelStateOpen {
// Note that by this moment the channel could have closed, so the fact that
// we successfully enqueue the message doesn't mean that it will be sent or delivered.
return p.dataChannelWorker.Send(json)
}

return nil
return ErrDataChannelNotReady
}

// Processes the remote ICE candidates.
Expand Down