Skip to content
This repository has been archived by the owner on Dec 2, 2023. It is now read-only.

Commit

Permalink
Update udp
Browse files Browse the repository at this point in the history
Code from dtls
  • Loading branch information
adwpc committed Jun 4, 2020
1 parent 1f5fea7 commit e77bae4
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 277 deletions.
4 changes: 2 additions & 2 deletions configs/biz.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ addrs = ["127.0.0.1:2389"]
#listen ip port
host = "0.0.0.0"
port = "8443"
cert= "configs/cert.pem"
key= "configs/key.pem"
# cert= "configs/cert.pem"
# key= "configs/key.pem"

[nats]
url = "nats://127.0.0.1:4223"
Expand Down
2 changes: 1 addition & 1 deletion pkg/rtc/rtpengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
)

var (
listener *udp.Listener
listener net.Listener
kcpListener *kcp.Listener
stop bool
)
Expand Down
145 changes: 94 additions & 51 deletions pkg/rtc/rtpengine/udp/conn.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
// Package udp provides a connection-oriented listener over a UDP PacketConn
// Package udp provides a connection-oriented Listener over a UDP PacketConn
package udp

import (
"context"
"errors"
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/pion/ion/pkg/rtc/rtpengine/deadline"
"github.com/pion/transport/deadline"
"github.com/pion/transport/packetio"
)

const receiveMTU = 8192
const receiveMTU = 1500
const defaultListenBacklog = 10000

var errClosedListener = errors.New("udp: listener closed")
var errClosedListener = errors.New("udp: Listener closed")
var errListenQueueExceeded = errors.New("udp: listen queue exceeded")

// Listener augments a connection-oriented Listener over a UDP PacketConn
type Listener struct {
pConn *net.UDPConn

accepting atomic.Value // bool
acceptCh chan *Conn
doneCh chan struct{}
doneOnce sync.Once
accepting atomic.Value // bool
acceptCh chan *Conn
doneCh chan struct{}
doneOnce sync.Once
acceptFilter func([]byte) bool

connLock sync.Mutex
conns map[string]*Conn
Expand All @@ -34,8 +37,8 @@ type Listener struct {
errClose atomic.Value // error
}

// Accept waits for and returns the next connection to the listener.
func (l *Listener) Accept() (*Conn, error) {
// Accept waits for and returns the next connection to the Listener.
func (l *Listener) Accept() (net.Conn, error) {
select {
case c := <-l.acceptCh:
l.connWG.Add(1)
Expand All @@ -46,19 +49,32 @@ func (l *Listener) Accept() (*Conn, error) {
}
}

// Close closes the listener.
// Close closes the Listener.
// Any blocked Accept operations will be unblocked and return errors.
func (l *Listener) Close() error {
var err error
l.doneOnce.Do(func() {
l.connWG.Done()
l.accepting.Store(false)
close(l.doneCh)

l.connLock.Lock()
// Close unaccepted connections
L_CLOSE:
for {
select {
case c := <-l.acceptCh:
close(c.doneCh)
delete(l.conns, c.rAddr.String())

default:
break L_CLOSE
}
}
nConns := len(l.conns)
l.connLock.Unlock()

l.connWG.Done()

if nConns == 0 {
// Wait if this is the final connection
l.readWG.Wait()
Expand All @@ -73,23 +89,43 @@ func (l *Listener) Close() error {
return err
}

// Addr returns the listener's network address.
// Addr returns the Listener's network address.
func (l *Listener) Addr() net.Addr {
return l.pConn.LocalAddr()
}

// Listen creates a new listener
func Listen(network string, laddr *net.UDPAddr) (*Listener, error) {
// ListenConfig stores options for listening to an address.
type ListenConfig struct {
// Backlog defines the maximum length of the queue of pending
// connections. It is equivalent of the backlog argument of
// POSIX listen function.
// If a connection request arrives when the queue is full,
// the request will be silently discarded, unlike TCP.
// Set zero to use default value 128 which is same as Linux default.
Backlog int

// AcceptFilter determines whether the new conn should be made for
// the incoming packet. If not set, any packet creates new conn.
AcceptFilter func([]byte) bool
}

// Listen creates a new Listener based on the ListenConfig.
func (lc *ListenConfig) Listen(network string, laddr *net.UDPAddr) (net.Listener, error) {
if lc.Backlog == 0 {
lc.Backlog = defaultListenBacklog
}

conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err
}

l := &Listener{
pConn: conn,
acceptCh: make(chan *Conn),
conns: make(map[string]*Conn),
doneCh: make(chan struct{}),
pConn: conn,
acceptCh: make(chan *Conn, lc.Backlog),
conns: make(map[string]*Conn),
doneCh: make(chan struct{}),
acceptFilter: lc.AcceptFilter,
}
l.accepting.Store(true)
l.connWG.Add(1)
Expand All @@ -107,42 +143,63 @@ func Listen(network string, laddr *net.UDPAddr) (*Listener, error) {
return l, nil
}

// Listen creates a new Listener using default ListenConfig.
func Listen(network string, laddr *net.UDPAddr) (net.Listener, error) {
return (&ListenConfig{}).Listen(network, laddr)
}

var readBufferPool = &sync.Pool{
New: func() interface{} {
buf := make([]byte, receiveMTU)
return &buf
},
}

// readLoop has to tasks:
// 1. Dispatching incoming packets to the correct Conn.
// It can therefore not be ended until all Conns are closed.
// 2. Creating a new Conn when receiving from a new remote.
func (l *Listener) readLoop() {
defer l.readWG.Done()
buf := make([]byte, receiveMTU)

for {
buf := *(readBufferPool.Get().(*[]byte))
n, raddr, err := l.pConn.ReadFrom(buf)
if err != nil {
return
}
conn, err := l.getConn(raddr)
conn, ok, err := l.getConn(raddr, buf[:n])
if err != nil {
continue
}
cBuf := <-conn.readCh
n = copy(cBuf, buf[:n])
conn.sizeCh <- n
if ok {
_, _ = conn.buffer.Write(buf[:n])
}
}
}

func (l *Listener) getConn(raddr net.Addr) (*Conn, error) {
func (l *Listener) getConn(raddr net.Addr, buf []byte) (*Conn, bool, error) {
l.connLock.Lock()
defer l.connLock.Unlock()
conn, ok := l.conns[raddr.String()]
if !ok {
if !l.accepting.Load().(bool) {
return nil, errClosedListener
return nil, false, errClosedListener
}
if l.acceptFilter != nil {
if !l.acceptFilter(buf) {
return nil, false, nil
}
}
conn = l.newConn(raddr)
l.conns[raddr.String()] = conn
l.acceptCh <- conn
select {
case l.acceptCh <- conn:
l.conns[raddr.String()] = conn
default:
return nil, false, errListenQueueExceeded
}
}
return conn, nil
return conn, true, nil
}

// Conn augments a connection-oriented connection over a UDP PacketConn
Expand All @@ -151,39 +208,27 @@ type Conn struct {

rAddr net.Addr

readCh chan []byte
sizeCh chan int
buffer *packetio.Buffer

doneCh chan struct{}
doneOnce sync.Once

readDeadline *deadline.Deadline
writeDeadline *deadline.Deadline
}

func (l *Listener) newConn(rAddr net.Addr) *Conn {
return &Conn{
listener: l,
rAddr: rAddr,
readCh: make(chan []byte),
sizeCh: make(chan int),
buffer: packetio.NewBuffer(),
doneCh: make(chan struct{}),
readDeadline: deadline.New(),
writeDeadline: deadline.New(),
}
}

// Read
func (c *Conn) Read(p []byte) (int, error) {
select {
case c.readCh <- p:
n := <-c.sizeCh
return n, nil
case <-c.doneCh:
return 0, io.EOF
case <-c.readDeadline.Done():
return 0, context.DeadlineExceeded
}
return c.buffer.Read(p)
}

// Write writes len(p) bytes from p to the DTLS connection
Expand Down Expand Up @@ -233,21 +278,19 @@ func (c *Conn) RemoteAddr() net.Addr {

// SetDeadline implements net.Conn.SetDeadline
func (c *Conn) SetDeadline(t time.Time) error {
c.readDeadline.Set(t)
c.writeDeadline.Set(t)
// Deadline of underlying connection should not be changed
// since the connection can be shared.
return nil
return c.SetReadDeadline(t)
}

// SetReadDeadline implements net.Conn.SetDeadline
func (c *Conn) SetReadDeadline(t time.Time) error {
c.readDeadline.Set(t)
return nil
return c.buffer.SetReadDeadline(t)
}

// SetWriteDeadline implements net.Conn.SetDeadline
func (c *Conn) SetWriteDeadline(t time.Time) error {
c.writeDeadline.Set(t)
// Write deadline of underlying connection should not be changed
// since the connection can be shared.
return nil
}

0 comments on commit e77bae4

Please sign in to comment.