diff --git a/configs/biz.toml b/configs/biz.toml index b47c2f620..5a7c7bda5 100644 --- a/configs/biz.toml +++ b/configs/biz.toml @@ -19,8 +19,8 @@ addrs = ["127.0.0.1:2389"] #listen ip port host = "0.0.0.0" port = "8443" -cert= "configs/cert.pem" -key= "configs/key.pem" +# cert= "configs/cert.pem" +# key= "configs/key.pem" [nats] url = "nats://127.0.0.1:4223" diff --git a/pkg/rtc/rtpengine/engine.go b/pkg/rtc/rtpengine/engine.go index 88fdfd067..717f0f6e7 100644 --- a/pkg/rtc/rtpengine/engine.go +++ b/pkg/rtc/rtpengine/engine.go @@ -18,7 +18,7 @@ const ( ) var ( - listener *udp.Listener + listener net.Listener kcpListener *kcp.Listener stop bool ) diff --git a/pkg/rtc/rtpengine/udp/conn.go b/pkg/rtc/rtpengine/udp/conn.go index cee0e90df..92402c8ed 100644 --- a/pkg/rtc/rtpengine/udp/conn.go +++ b/pkg/rtc/rtpengine/udp/conn.go @@ -1,30 +1,33 @@ -// Package udp provides a connection-oriented listener over a UDP PacketConn +// Package udp provides a connection-oriented Listener over a UDP PacketConn package udp import ( "context" "errors" - "io" "net" "sync" "sync/atomic" "time" - "github.com/pion/ion/pkg/rtc/rtpengine/deadline" + "github.com/pion/transport/deadline" + "github.com/pion/transport/packetio" ) -const receiveMTU = 8192 +const receiveMTU = 1500 +const defaultListenBacklog = 10000 -var errClosedListener = errors.New("udp: listener closed") +var errClosedListener = errors.New("udp: Listener closed") +var errListenQueueExceeded = errors.New("udp: listen queue exceeded") // Listener augments a connection-oriented Listener over a UDP PacketConn type Listener struct { pConn *net.UDPConn - accepting atomic.Value // bool - acceptCh chan *Conn - doneCh chan struct{} - doneOnce sync.Once + accepting atomic.Value // bool + acceptCh chan *Conn + doneCh chan struct{} + doneOnce sync.Once + acceptFilter func([]byte) bool connLock sync.Mutex conns map[string]*Conn @@ -34,8 +37,8 @@ type Listener struct { errClose atomic.Value // error } -// Accept waits for and returns the next connection to the listener. -func (l *Listener) Accept() (*Conn, error) { +// Accept waits for and returns the next connection to the Listener. +func (l *Listener) Accept() (net.Conn, error) { select { case c := <-l.acceptCh: l.connWG.Add(1) @@ -46,19 +49,32 @@ func (l *Listener) Accept() (*Conn, error) { } } -// Close closes the listener. +// Close closes the Listener. // Any blocked Accept operations will be unblocked and return errors. func (l *Listener) Close() error { var err error l.doneOnce.Do(func() { - l.connWG.Done() l.accepting.Store(false) close(l.doneCh) l.connLock.Lock() + // Close unaccepted connections + L_CLOSE: + for { + select { + case c := <-l.acceptCh: + close(c.doneCh) + delete(l.conns, c.rAddr.String()) + + default: + break L_CLOSE + } + } nConns := len(l.conns) l.connLock.Unlock() + l.connWG.Done() + if nConns == 0 { // Wait if this is the final connection l.readWG.Wait() @@ -73,23 +89,43 @@ func (l *Listener) Close() error { return err } -// Addr returns the listener's network address. +// Addr returns the Listener's network address. func (l *Listener) Addr() net.Addr { return l.pConn.LocalAddr() } -// Listen creates a new listener -func Listen(network string, laddr *net.UDPAddr) (*Listener, error) { +// ListenConfig stores options for listening to an address. +type ListenConfig struct { + // Backlog defines the maximum length of the queue of pending + // connections. It is equivalent of the backlog argument of + // POSIX listen function. + // If a connection request arrives when the queue is full, + // the request will be silently discarded, unlike TCP. + // Set zero to use default value 128 which is same as Linux default. + Backlog int + + // AcceptFilter determines whether the new conn should be made for + // the incoming packet. If not set, any packet creates new conn. + AcceptFilter func([]byte) bool +} + +// Listen creates a new Listener based on the ListenConfig. +func (lc *ListenConfig) Listen(network string, laddr *net.UDPAddr) (net.Listener, error) { + if lc.Backlog == 0 { + lc.Backlog = defaultListenBacklog + } + conn, err := net.ListenUDP(network, laddr) if err != nil { return nil, err } l := &Listener{ - pConn: conn, - acceptCh: make(chan *Conn), - conns: make(map[string]*Conn), - doneCh: make(chan struct{}), + pConn: conn, + acceptCh: make(chan *Conn, lc.Backlog), + conns: make(map[string]*Conn), + doneCh: make(chan struct{}), + acceptFilter: lc.AcceptFilter, } l.accepting.Store(true) l.connWG.Add(1) @@ -107,42 +143,63 @@ func Listen(network string, laddr *net.UDPAddr) (*Listener, error) { return l, nil } +// Listen creates a new Listener using default ListenConfig. +func Listen(network string, laddr *net.UDPAddr) (net.Listener, error) { + return (&ListenConfig{}).Listen(network, laddr) +} + +var readBufferPool = &sync.Pool{ + New: func() interface{} { + buf := make([]byte, receiveMTU) + return &buf + }, +} + // readLoop has to tasks: // 1. Dispatching incoming packets to the correct Conn. // It can therefore not be ended until all Conns are closed. // 2. Creating a new Conn when receiving from a new remote. func (l *Listener) readLoop() { defer l.readWG.Done() - buf := make([]byte, receiveMTU) for { + buf := *(readBufferPool.Get().(*[]byte)) n, raddr, err := l.pConn.ReadFrom(buf) if err != nil { return } - conn, err := l.getConn(raddr) + conn, ok, err := l.getConn(raddr, buf[:n]) if err != nil { continue } - cBuf := <-conn.readCh - n = copy(cBuf, buf[:n]) - conn.sizeCh <- n + if ok { + _, _ = conn.buffer.Write(buf[:n]) + } } } -func (l *Listener) getConn(raddr net.Addr) (*Conn, error) { +func (l *Listener) getConn(raddr net.Addr, buf []byte) (*Conn, bool, error) { l.connLock.Lock() defer l.connLock.Unlock() conn, ok := l.conns[raddr.String()] if !ok { if !l.accepting.Load().(bool) { - return nil, errClosedListener + return nil, false, errClosedListener + } + if l.acceptFilter != nil { + if !l.acceptFilter(buf) { + return nil, false, nil + } } conn = l.newConn(raddr) - l.conns[raddr.String()] = conn - l.acceptCh <- conn + select { + case l.acceptCh <- conn: + l.conns[raddr.String()] = conn + default: + return nil, false, errListenQueueExceeded + } } - return conn, nil + return conn, true, nil } // Conn augments a connection-oriented connection over a UDP PacketConn @@ -151,13 +208,11 @@ type Conn struct { rAddr net.Addr - readCh chan []byte - sizeCh chan int + buffer *packetio.Buffer doneCh chan struct{} doneOnce sync.Once - readDeadline *deadline.Deadline writeDeadline *deadline.Deadline } @@ -165,25 +220,15 @@ func (l *Listener) newConn(rAddr net.Addr) *Conn { return &Conn{ listener: l, rAddr: rAddr, - readCh: make(chan []byte), - sizeCh: make(chan int), + buffer: packetio.NewBuffer(), doneCh: make(chan struct{}), - readDeadline: deadline.New(), writeDeadline: deadline.New(), } } // Read func (c *Conn) Read(p []byte) (int, error) { - select { - case c.readCh <- p: - n := <-c.sizeCh - return n, nil - case <-c.doneCh: - return 0, io.EOF - case <-c.readDeadline.Done(): - return 0, context.DeadlineExceeded - } + return c.buffer.Read(p) } // Write writes len(p) bytes from p to the DTLS connection @@ -233,21 +278,19 @@ func (c *Conn) RemoteAddr() net.Addr { // SetDeadline implements net.Conn.SetDeadline func (c *Conn) SetDeadline(t time.Time) error { - c.readDeadline.Set(t) c.writeDeadline.Set(t) - // Deadline of underlying connection should not be changed - // since the connection can be shared. - return nil + return c.SetReadDeadline(t) } // SetReadDeadline implements net.Conn.SetDeadline func (c *Conn) SetReadDeadline(t time.Time) error { - c.readDeadline.Set(t) - return nil + return c.buffer.SetReadDeadline(t) } // SetWriteDeadline implements net.Conn.SetDeadline func (c *Conn) SetWriteDeadline(t time.Time) error { c.writeDeadline.Set(t) + // Write deadline of underlying connection should not be changed + // since the connection can be shared. return nil } diff --git a/pkg/rtc/rtpengine/udp/conn_test.go b/pkg/rtc/rtpengine/udp/conn_test.go deleted file mode 100644 index 615de2741..000000000 --- a/pkg/rtc/rtpengine/udp/conn_test.go +++ /dev/null @@ -1,223 +0,0 @@ -// +build !js - -package udp - -import ( - "fmt" - "net" - "testing" - "time" - - "github.com/pion/transport/test" -) - -// Note: doesn't work since closing isn't propagated to the other side -//func TestNetTest(t *testing.T) { -// lim := test.TimeOut(time.Minute*1 + time.Second*10) -// defer lim.Stop() -// -// nettest.TestConn(t, func() (c1, c2 net.Conn, stop func(), err error) { -// listener, c1, c2, err = pipe() -// if err != nil { -// return nil, nil, nil, err -// } -// stop = func() { -// c1.Close() -// c2.Close() -// listener.Close(1 * time.Second) -// } -// return -// }) -//} - -func TestStressDuplex(t *testing.T) { - // Limit runtime in case of deadlocks - lim := test.TimeOut(time.Second * 20) - defer lim.Stop() - - // Check for leaking routines - report := test.CheckRoutines(t) - defer report() - - // Run the test - stressDuplex(t) -} - -func stressDuplex(t *testing.T) { - listener, ca, cb, err := pipe() - if err != nil { - t.Fatal(err) - } - - defer func() { - err = ca.Close() - if err != nil { - t.Fatal(err) - } - err = cb.Close() - if err != nil { - t.Fatal(err) - } - err = listener.Close() - if err != nil { - t.Fatal(err) - } - }() - - opt := test.Options{ - MsgSize: 2048, - MsgCount: 1, // Can't rely on UDP message order in CI - } - - err = test.StressDuplex(ca, cb, opt) - if err != nil { - t.Fatal(err) - } -} - -func TestListenerCloseTimeout(t *testing.T) { - // Limit runtime in case of deadlocks - lim := test.TimeOut(time.Second * 20) - defer lim.Stop() - - // Check for leaking routines - report := test.CheckRoutines(t) - defer report() - - listener, ca, _, err := pipe() - if err != nil { - t.Fatal(err) - } - - err = listener.Close() - if err != nil { - t.Fatal(err) - } - - // Close client after server closes to cleanup - err = ca.Close() - if err != nil { - t.Fatal(err) - } -} - -func pipe() (*Listener, net.Conn, *net.UDPConn, error) { - // Start listening - network, addr := getConfig() - listener, err := Listen(network, addr) - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to listen: %v", err) - } - - // Open a connection - var dConn *net.UDPConn - dConn, err = net.DialUDP(network, nil, listener.Addr().(*net.UDPAddr)) - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to dial: %v", err) - } - - // Write to the connection to initiate it - handshake := "hello" - _, err = dConn.Write([]byte(handshake)) - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to write to dialed Conn: %v", err) - } - - // Accept the connection - var lConn net.Conn - lConn, err = listener.Accept() - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to accept Conn: %v", err) - } - - buf := make([]byte, len(handshake)) - n := 0 - n, err = lConn.Read(buf) - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to read handshake: %v", err) - } - - result := string(buf[:n]) - if handshake != result { - return nil, nil, nil, fmt.Errorf("handshake failed: %s != %s", handshake, result) - } - - return listener, lConn, dConn, nil -} - -func getConfig() (string, *net.UDPAddr) { - return "udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0} -} - -func TestConnClose(t *testing.T) { - lim := test.TimeOut(time.Second * 5) - defer lim.Stop() - - t.Run("Close", func(t *testing.T) { - // Check for leaking routines - report := test.CheckRoutines(t) - defer report() - - listener, ca, cb, errPipe := pipe() - if errPipe != nil { - t.Fatal(errPipe) - } - if err := ca.Close(); err != nil { - t.Errorf("Failed to close A side: %v", err) - } - if err := cb.Close(); err != nil { - t.Errorf("Failed to close B side: %v", err) - } - if err := listener.Close(); err != nil { - t.Errorf("Failed to close listener: %v", err) - } - }) - t.Run("CloseError1", func(t *testing.T) { - // Check for leaking routines - report := test.CheckRoutines(t) - defer report() - - listener, ca, cb, errPipe := pipe() - if errPipe != nil { - t.Fatal(errPipe) - } - // Close listener.pConn to inject error. - if err := listener.pConn.Close(); err != nil { - t.Error(err) - } - - if err := cb.Close(); err != nil { - t.Errorf("Failed to close A side: %v", err) - } - if err := ca.Close(); err != nil { - t.Errorf("Failed to close B side: %v", err) - } - if err := listener.Close(); err == nil { - t.Errorf("Error is not propagated to Listener.Close") - } - }) - t.Run("CloseError2", func(t *testing.T) { - // Check for leaking routines - report := test.CheckRoutines(t) - defer report() - - listener, ca, cb, errPipe := pipe() - if errPipe != nil { - t.Fatal(errPipe) - } - // Close listener.pConn to inject error. - if err := listener.pConn.Close(); err != nil { - t.Error(err) - } - - if err := cb.Close(); err != nil { - t.Errorf("Failed to close A side: %v", err) - } - if err := listener.Close(); err != nil { - t.Errorf("Failed to close listener: %v", err) - } - if err := ca.Close(); err == nil { - t.Errorf("Error is not propagated to Conn.Close") - } - }) -}