Skip to content

Commit

Permalink
Add Close method to Server
Browse files Browse the repository at this point in the history
  • Loading branch information
yutopp committed Aug 9, 2018
1 parent b031d78 commit 705fe08
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 0 deletions.
14 changes: 14 additions & 0 deletions error.go
@@ -0,0 +1,14 @@
//
// Copyright (c) 2018- yutopp (yutopp@gmail.com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt)
//

package rtmp

import (
"github.com/pkg/errors"
)

var ErrClosed = errors.New("Server is closed")
64 changes: 64 additions & 0 deletions server.go
Expand Up @@ -8,13 +8,19 @@
package rtmp

import (
"github.com/pkg/errors"
"io"
"net"
"sync"
"time"
)

type Server struct {
config *ServerConfig

listener net.Listener
mu sync.Mutex
doneCh chan struct{}
}

type ServerConfig struct {
Expand All @@ -31,18 +37,76 @@ func NewServer(config *ServerConfig) *Server {
}

func (srv *Server) Serve(l net.Listener) error {
if err := srv.registerListener(l); err != nil {
return errors.Wrap(err, "Already serverd")
}

defer l.Close()

for {
rwc, err := l.Accept()
if err != nil {
select {
case <-srv.getDoneCh(): // closed
return ErrClosed

default: // do nothing
}

continue
}

go srv.handleConn(rwc)
}
}

func (srv *Server) Close() error {
srv.mu.Lock()
defer srv.mu.Unlock()

doneCh := srv.getDoneChLocked()
select {
case <-doneCh: // already closed
return nil
default:
close(doneCh)
}

if srv.listener == nil {
return nil
}

return srv.listener.Close()
}

func (srv *Server) registerListener(l net.Listener) error {
srv.mu.Lock()
defer srv.mu.Unlock()

if srv.listener != nil {
return errors.New("Listener is already registered")
}

srv.listener = l

return nil
}

func (srv *Server) getDoneCh() chan struct{} {
srv.mu.Lock()
defer srv.mu.Unlock()

return srv.getDoneChLocked()
}

func (srv *Server) getDoneChLocked() chan struct{} {
if srv.doneCh == nil {
srv.doneCh = make(chan struct{})
}

return srv.doneCh
}

func (srv *Server) handleConn(conn net.Conn) {
c := NewConn(&rwcHasTimeout{
conn: conn,
Expand Down
31 changes: 31 additions & 0 deletions server_test.go
@@ -0,0 +1,31 @@
//
// Copyright (c) 2018- yutopp (yutopp@gmail.com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt)
//

package rtmp

import (
"github.com/stretchr/testify/assert"
"net"
"testing"
"time"
)

func TestServerCanClose(t *testing.T) {
srv := NewServer(&ServerConfig{})

go func(ch <-chan time.Time) {
<-ch
err := srv.Close()
assert.Nil(t, err)
}(time.After(1 * time.Second))

l, err := net.Listen("tcp", "127.0.0.1:")
assert.Nil(t, err)

err = srv.Serve(l)
assert.Equal(t, ErrClosed, err)
}

0 comments on commit 705fe08

Please sign in to comment.