Skip to content

Commit

Permalink
Merge 9ec7f94 into 36a71eb
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated committed Mar 2, 2019
2 parents 36a71eb + 9ec7f94 commit 905e04a
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Check out the **[contributing wiki](https://github.com/pions/webrtc/wiki/Contrib
* [Yutaka Takeda](https://github.com/enobufs) *Bridge, simulates loss and out of order packets*
* [Sean DuBois](https://github.com/Sean-Der) - *Testing Infra*
* [Woodrow Douglass](https://github.com/wdouglass) *Fix test flakiness*
* [Luke Curley](https://github.com/kixelated) *Batched packet support*

### License
MIT License - see [LICENSE](LICENSE) for full text
33 changes: 33 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package transport

import (
"net"
)

// Conn is an extension of net.Conn with batched support
type Conn interface {
net.Conn

// ReadBatch takes a slice of Messages, reading them in bulk if supported.
// Returns the number of messages read, which may be less than the full slice.
// Each message will have Buffer and Size populated.
ReadBatch(ms []Message) (n int, err error)

// WriteBatch takes a slice of Messages, writing them in bulk if supported.
// Returns the number of messages written, which may be less than the full slice.
// Each message will have Size populated with the number of bytes written.
WriteBatch(ms []Message) (n int, err error)
}

// NewConn creates a Conn given a net.Conn
// This is a wrapper, added stubs for batched support when needed.
func NewConn(conn net.Conn) (c Conn) {
switch ct := conn.(type) {
case Conn:
// Don't wrap if it already implements the interface.
return ct
default:
// Otherwise use a wrapper that translates the batch calls into single reads/writes
return newMessageSingler(conn)
}
}
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module github.com/pions/transport

go 1.12

require golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95 h1:fY7Dsw114eJN4boqzVSbpVHO6rTdhq6/GnXeu+PKnzU=
golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
7 changes: 7 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package transport

// Message is a buffer and size sent/received by a connection.
type Message struct {
Buffer []byte
Size int
}
43 changes: 43 additions & 0 deletions message_singler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package transport

import (
"net"
)

type messageSingler struct {
net.Conn
}

func newMessageSingler(conn net.Conn) (ms *messageSingler) {
return &messageSingler{conn}
}

func (msr *messageSingler) ReadBatch(ms []Message) (n int, err error) {
if len(ms) == 0 {
return 0, nil
}

m := &ms[0]

m.Size, err = msr.Read(m.Buffer)
if err != nil {
return 0, err
}

return 1, nil
}

func (msr *messageSingler) WriteBatch(ms []Message) (n int, err error) {
if len(ms) == 0 {
return 0, nil
}

m := &ms[0]

m.Size, err = msr.Write(m.Buffer)
if err != nil {
return 0, err
}

return 1, nil
}
11 changes: 11 additions & 0 deletions packet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package transport

import (
"net"
)

// Packet is a message also containing an address.
type Packet struct {
Addr net.Addr
Message
}
58 changes: 58 additions & 0 deletions packet_batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package transport

import (
"net"

"golang.org/x/net/ipv4"
)

type packetBatcher struct {
net.PacketConn

ipv4 *ipv4.PacketConn
}

func newPacketBatcher(conn net.PacketConn) (pb *packetBatcher) {
return &packetBatcher{
PacketConn: conn,
ipv4: ipv4.NewPacketConn(conn),
}
}

func (pb *packetBatcher) ReadBatch(ps []Packet) (n int, err error) {
messages := make([]ipv4.Message, len(ps))

for i, p := range ps {
messages[i] = ipv4.Message{
Addr: p.Addr,
Buffers: [][]byte{p.Buffer},
}
}

n, err = pb.ipv4.ReadBatch(messages, 0)

for i := 0; i < n; i++ {
ps[i].Size = messages[i].N
}

return n, err
}

func (pb *packetBatcher) WriteBatch(ps []Packet) (n int, err error) {
messages := make([]ipv4.Message, len(ps))

for i, p := range ps {
messages[i] = ipv4.Message{
Addr: p.Addr,
Buffers: [][]byte{p.Buffer},
}
}

n, err = pb.ipv4.WriteBatch(messages, 0)

for i := 0; i < n; i++ {
ps[i].Size = messages[i].N
}

return n, err
}
30 changes: 30 additions & 0 deletions packet_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package transport

import (
"net"
)

// PacketConn is an extension of net.PacketConn with batched support
type PacketConn interface {
net.PacketConn

ReadBatch(ps []Packet) (n int, err error)
WriteBatch(ps []Packet) (n int, err error)
}

// NewPacketConn creates a PacketConn given a net.PacketConn
// This is a wrapper, added batched support when a net.IPConn or net.UDPConn is provided.
func NewPacketConn(conn net.PacketConn) (pc PacketConn) {
switch ct := conn.(type) {
case PacketConn:
// Don't wrap if it already implements the interface.
return ct
case *net.IPConn, *net.UDPConn:
// Wrap the connection with a batcher that uses sendmmsg.
// We have to check for raw IPConn and UDPConn otherwise it will error.
return newPacketBatcher(conn)
default:
// Otherwise use a wrapper that translates the batch calls into single reads/writes
return newPacketSingler(conn)
}
}
46 changes: 46 additions & 0 deletions packet_singler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package transport

import (
"net"
)

type packetSingler struct {
net.PacketConn
}

func newPacketSingler(conn net.PacketConn) (ps *packetSingler) {
return &packetSingler{conn}
}

func (psr *packetSingler) ReadBatch(ps []Packet) (n int, err error) {
if len(ps) == 0 {
return 0, nil
}

p := &ps[0]

size, addr, err := psr.ReadFrom(p.Buffer)
if err != nil {
return 0, err
}

p.Addr = addr
p.Size = size

return 1, nil
}

func (psr *packetSingler) WriteBatch(ps []Packet) (n int, err error) {
if len(ps) == 0 {
return 0, nil
}

p := &ps[0]

p.Size, err = psr.WriteTo(p.Buffer, p.Addr)
if err != nil {
return 0, err
}

return 1, nil
}

0 comments on commit 905e04a

Please sign in to comment.