Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 160 additions & 79 deletions datachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,64 +19,16 @@ const dataChannelBufferSize = 16384 // Lowest common denominator among browsers
type DataChannel struct {
mu sync.RWMutex

// Label represents a label that can be used to distinguish this
// DataChannel object from other DataChannel objects. Scripts are
// allowed to create multiple DataChannel objects with the same label.
Label string

// Ordered represents if the DataChannel is ordered, and false if
// out-of-order delivery is allowed.
Ordered bool

// MaxPacketLifeTime represents the length of the time window (msec) during
// which transmissions and retransmissions may occur in unreliable mode.
MaxPacketLifeTime *uint16

// MaxRetransmits represents the maximum number of retransmissions that are
// attempted in unreliable mode.
MaxRetransmits *uint16

// Protocol represents the name of the sub-protocol used with this
// DataChannel.
Protocol string

// Negotiated represents whether this DataChannel was negotiated by the
// application (true), or not (false).
Negotiated bool

// ID represents the ID for this DataChannel. The value is initially
// null, which is what will be returned if the ID was not provided at
// channel creation time, and the DTLS role of the SCTP transport has not
// yet been negotiated. Otherwise, it will return the ID that was either
// selected by the script or generated. After the ID is set to a non-null
// value, it will not change.
ID *uint16

// Priority represents the priority for this DataChannel. The priority is
// assigned at channel creation time.
Priority PriorityType

// ReadyState represents the state of the DataChannel object.
ReadyState DataChannelState

// BufferedAmount represents the number of bytes of application data
// (UTF-8 text and binary data) that have been queued using send(). Even
// though the data transmission can occur in parallel, the returned value
// MUST NOT be decreased before the current task yielded back to the event
// loop to prevent race conditions. The value does not include framing
// overhead incurred by the protocol, or buffering done by the operating
// system or network hardware. The value of BufferedAmount slot will only
// increase with each call to the send() method as long as the ReadyState is
// open; however, BufferedAmount does not reset to zero once the channel
// closes.
BufferedAmount uint64

// BufferedAmountLowThreshold represents the threshold at which the
// bufferedAmount is considered to be low. When the bufferedAmount decreases
// from above this threshold to equal or below it, the bufferedamountlow
// event fires. BufferedAmountLowThreshold is initially zero on each new
// DataChannel, but the application may change its value at any time.
BufferedAmountLowThreshold uint64
label string
ordered bool
maxPacketLifeTime *uint16
maxRetransmits *uint16
protocol string
negotiated bool
id *uint16
priority PriorityType
readyState DataChannelState
bufferedAmountLowThreshold uint64

// The binaryType represents attribute MUST, on getting, return the value to
// which it was last set. On setting, if the new value is either the string
Expand Down Expand Up @@ -126,12 +78,12 @@ func (api *API) newDataChannel(params *DataChannelParameters) (*DataChannel, err
}

return &DataChannel{
Label: params.Label,
ID: &params.ID,
Ordered: params.Ordered,
MaxPacketLifeTime: params.MaxPacketLifeTime,
MaxRetransmits: params.MaxRetransmits,
ReadyState: DataChannelStateConnecting,
label: params.Label,
id: &params.ID,
ordered: params.Ordered,
maxPacketLifeTime: params.MaxPacketLifeTime,
maxRetransmits: params.MaxRetransmits,
readyState: DataChannelStateConnecting,
api: api,
}, nil
}
Expand All @@ -150,23 +102,23 @@ func (d *DataChannel) open(sctpTransport *SCTPTransport) error {
var reliabilityParameteer uint32

switch {
case d.MaxPacketLifeTime == nil && d.MaxRetransmits == nil:
if d.Ordered {
case d.maxPacketLifeTime == nil && d.maxRetransmits == nil:
if d.ordered {
channelType = datachannel.ChannelTypeReliable
} else {
channelType = datachannel.ChannelTypeReliableUnordered
}

case d.MaxRetransmits != nil:
reliabilityParameteer = uint32(*d.MaxRetransmits)
if d.Ordered {
case d.maxRetransmits != nil:
reliabilityParameteer = uint32(*d.maxRetransmits)
if d.ordered {
channelType = datachannel.ChannelTypePartialReliableRexmit
} else {
channelType = datachannel.ChannelTypePartialReliableRexmitUnordered
}
default:
reliabilityParameteer = uint32(*d.MaxPacketLifeTime)
if d.Ordered {
reliabilityParameteer = uint32(*d.maxPacketLifeTime)
if d.ordered {
channelType = datachannel.ChannelTypePartialReliableTimed
} else {
channelType = datachannel.ChannelTypePartialReliableTimedUnordered
Expand All @@ -177,16 +129,16 @@ func (d *DataChannel) open(sctpTransport *SCTPTransport) error {
ChannelType: channelType,
Priority: datachannel.ChannelPriorityNormal, // TODO: Wiring
ReliabilityParameter: reliabilityParameteer,
Label: d.Label,
Label: d.label,
}

dc, err := datachannel.Dial(d.sctpTransport.association, *d.ID, cfg)
dc, err := datachannel.Dial(d.sctpTransport.association, *d.id, cfg)
if err != nil {
d.mu.Unlock()
return err
}

d.ReadyState = DataChannelStateOpen
d.readyState = DataChannelStateOpen
d.mu.Unlock()

d.handleOpen(dc)
Expand Down Expand Up @@ -311,7 +263,7 @@ func (d *DataChannel) readLoop() {
}
if err != nil {
d.mu.Lock()
d.ReadyState = DataChannelStateClosed
d.readyState = DataChannelStateClosed
d.mu.Unlock()
if err != io.EOF {
// TODO: Throw OnError
Expand Down Expand Up @@ -359,7 +311,7 @@ func (d *DataChannel) SendText(s string) error {
func (d *DataChannel) ensureOpen() error {
d.mu.RLock()
defer d.mu.RUnlock()
if d.ReadyState != DataChannelStateOpen {
if d.readyState != DataChannelStateOpen {
return &rtcerr.InvalidStateError{Err: ErrDataChannelNotOpen}
}
return nil
Expand Down Expand Up @@ -394,12 +346,141 @@ func (d *DataChannel) Close() error {
d.mu.Lock()
defer d.mu.Unlock()

if d.ReadyState == DataChannelStateClosing ||
d.ReadyState == DataChannelStateClosed {
if d.readyState == DataChannelStateClosing ||
d.readyState == DataChannelStateClosed {
return nil
}

d.ReadyState = DataChannelStateClosing
d.readyState = DataChannelStateClosing

return d.dataChannel.Close()
}

// Label represents a label that can be used to distinguish this
// DataChannel object from other DataChannel objects. Scripts are
// allowed to create multiple DataChannel objects with the same label.
func (d *DataChannel) Label() string {
d.mu.RLock()
defer d.mu.RUnlock()

return d.label
}

// Ordered represents if the DataChannel is ordered, and false if
// out-of-order delivery is allowed.
func (d *DataChannel) Ordered() bool {
d.mu.RLock()
defer d.mu.RUnlock()

return d.ordered
}

// MaxPacketLifeTime represents the length of the time window (msec) during
// which transmissions and retransmissions may occur in unreliable mode.
func (d *DataChannel) MaxPacketLifeTime() *uint16 {
d.mu.RLock()
defer d.mu.RUnlock()

return d.maxPacketLifeTime
}

// MaxRetransmits represents the maximum number of retransmissions that are
// attempted in unreliable mode.
func (d *DataChannel) MaxRetransmits() *uint16 {
d.mu.RLock()
defer d.mu.RUnlock()

return d.maxRetransmits
}

// Protocol represents the name of the sub-protocol used with this
// DataChannel.
func (d *DataChannel) Protocol() string {
d.mu.RLock()
defer d.mu.RUnlock()

return d.protocol
}

// Negotiated represents whether this DataChannel was negotiated by the
// application (true), or not (false).
func (d *DataChannel) Negotiated() bool {
d.mu.RLock()
defer d.mu.RUnlock()

return d.negotiated
}

// ID represents the ID for this DataChannel. The value is initially
// null, which is what will be returned if the ID was not provided at
// channel creation time, and the DTLS role of the SCTP transport has not
// yet been negotiated. Otherwise, it will return the ID that was either
// selected by the script or generated. After the ID is set to a non-null
// value, it will not change.
func (d *DataChannel) ID() *uint16 {
d.mu.RLock()
defer d.mu.RUnlock()

return d.id
}

// Priority represents the priority for this DataChannel. The priority is
// assigned at channel creation time.
func (d *DataChannel) Priority() PriorityType {
d.mu.RLock()
defer d.mu.RUnlock()

return d.priority
}

// ReadyState represents the state of the DataChannel object.
func (d *DataChannel) ReadyState() DataChannelState {
d.mu.RLock()
defer d.mu.RUnlock()

return d.readyState
}

// BufferedAmount represents the number of bytes of application data
// (UTF-8 text and binary data) that have been queued using send(). Even
// though the data transmission can occur in parallel, the returned value
// MUST NOT be decreased before the current task yielded back to the event
// loop to prevent race conditions. The value does not include framing
// overhead incurred by the protocol, or buffering done by the operating
// system or network hardware. The value of BufferedAmount slot will only
// increase with each call to the send() method as long as the ReadyState is
// open; however, BufferedAmount does not reset to zero once the channel
// closes.
func (d *DataChannel) BufferedAmount() uint64 {
d.mu.RLock()
defer d.mu.RUnlock()

// TODO: wire to SCTP (pions/sctp#11)
return 0
}

// BufferedAmountLowThreshold represents the threshold at which the
// bufferedAmount is considered to be low. When the bufferedAmount decreases
// from above this threshold to equal or below it, the bufferedamountlow
// event fires. BufferedAmountLowThreshold is initially zero on each new
// DataChannel, but the application may change its value at any time.
func (d *DataChannel) BufferedAmountLowThreshold() uint64 {
d.mu.RLock()
defer d.mu.RUnlock()

// TODO: wire to SCTP (pions/sctp#11)
return d.bufferedAmountLowThreshold
}

// SetBufferedAmountLowThreshold represents the threshold at which the
// bufferedAmount is considered to be low. When the bufferedAmount decreases
// from above this threshold to equal or below it, the bufferedamountlow
// event fires. BufferedAmountLowThreshold is initially zero on each new
// DataChannel, but the application may change its value at any time.
func (d *DataChannel) SetBufferedAmountLowThreshold(th uint64) {
d.mu.Lock()
defer d.mu.Unlock()

// TODO: wire to SCTP (pions/sctp#11)
d.bufferedAmountLowThreshold = th
}
48 changes: 34 additions & 14 deletions datachannel_go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestDataChannel_Send(t *testing.T) {
t.Fatalf("Failed to create a PC pair for testing")
}

assert.True(t, dc.Ordered, "Ordered should be set to true")
assert.True(t, dc.ordered, "Ordered should be set to true")

dc.OnOpen(func() {
e := dc.SendText("Ping")
Expand All @@ -77,7 +77,7 @@ func TestDataChannel_Send(t *testing.T) {
})

answerPC.OnDataChannel(func(d *DataChannel) {
assert.True(t, d.Ordered, "Ordered should be set to true")
assert.True(t, d.ordered, "Ordered should be set to true")

d.OnMessage(func(msg DataChannelMessage) {
e := d.Send([]byte("Pong"))
Expand Down Expand Up @@ -201,16 +201,16 @@ func TestDataChannelParamters(t *testing.T) {
offerPC, answerPC, dc, done := setUpReliabilityParamTest(t, options)

// Check if parameters are correctly set
assert.True(t, dc.Ordered, "Ordered should be set to true")
if assert.NotNil(t, dc.MaxPacketLifeTime, "should not be nil") {
assert.Equal(t, maxPacketLifeTime, *dc.MaxPacketLifeTime, "should match")
assert.True(t, dc.Ordered(), "Ordered should be set to true")
if assert.NotNil(t, dc.MaxPacketLifeTime(), "should not be nil") {
assert.Equal(t, maxPacketLifeTime, *dc.MaxPacketLifeTime(), "should match")
}

answerPC.OnDataChannel(func(d *DataChannel) {
// Check if parameters are correctly set
assert.True(t, d.Ordered, "Ordered should be set to true")
if assert.NotNil(t, d.MaxPacketLifeTime, "should not be nil") {
assert.Equal(t, maxPacketLifeTime, *d.MaxPacketLifeTime, "should match")
assert.True(t, d.ordered, "Ordered should be set to true")
if assert.NotNil(t, d.maxPacketLifeTime, "should not be nil") {
assert.Equal(t, maxPacketLifeTime, *d.maxPacketLifeTime, "should match")
}
done <- true
})
Expand All @@ -229,20 +229,40 @@ func TestDataChannelParamters(t *testing.T) {
offerPC, answerPC, dc, done := setUpReliabilityParamTest(t, options)

// Check if parameters are correctly set
assert.False(t, dc.Ordered, "Ordered should be set to false")
if assert.NotNil(t, dc.MaxRetransmits, "should not be nil") {
assert.Equal(t, maxRetransmits, *dc.MaxRetransmits, "should match")
assert.False(t, dc.Ordered(), "Ordered should be set to false")
if assert.NotNil(t, dc.MaxRetransmits(), "should not be nil") {
assert.Equal(t, maxRetransmits, *dc.MaxRetransmits(), "should match")
}

answerPC.OnDataChannel(func(d *DataChannel) {
// Check if parameters are correctly set
assert.False(t, d.Ordered, "Ordered should be set to false")
if assert.NotNil(t, d.MaxRetransmits, "should not be nil") {
assert.Equal(t, maxRetransmits, *d.MaxRetransmits, "should match")
assert.False(t, d.ordered, "Ordered should be set to false")
if assert.NotNil(t, d.maxRetransmits, "should not be nil") {
assert.Equal(t, maxRetransmits, *d.maxRetransmits, "should match")
}
done <- true
})

closeReliabilityParamTest(t, offerPC, answerPC, done)
})

t.Run("All other property methods", func(t *testing.T) {
id := uint16(123)
dc := &DataChannel{}
dc.id = &id
dc.label = "mylabel"
dc.protocol = "myprotocol"
dc.negotiated = true
dc.priority = PriorityTypeMedium

assert.Equal(t, dc.id, dc.ID(), "should match")
assert.Equal(t, dc.label, dc.Label(), "should match")
assert.Equal(t, dc.protocol, dc.Protocol(), "should match")
assert.Equal(t, dc.negotiated, dc.Negotiated(), "should match")
assert.Equal(t, dc.priority, dc.Priority(), "should match")
assert.Equal(t, dc.readyState, dc.ReadyState(), "should match")
assert.Equal(t, uint64(0), dc.BufferedAmount(), "should match")
dc.SetBufferedAmountLowThreshold(1500)
assert.Equal(t, uint64(1500), dc.BufferedAmountLowThreshold(), "should match")
})
}
Loading