Skip to content

Commit

Permalink
Cleaned up some examples and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rsms committed Feb 9, 2015
1 parent 101f6c2 commit a2d869d
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 35 deletions.
14 changes: 14 additions & 0 deletions assert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package gotalk
import (
"testing"
)

func assertEq(t *testing.T, actual, expect interface{}) {
if actual != expect {
if _, ok := expect.(string); ok {
t.Errorf("Expected `%q %T` but got `%q %T`\n", expect,expect, actual,actual)
} else {
t.Errorf("Expected `%v %T` but got `%v %T`\n", expect,expect, actual,actual)
}
}
}
2 changes: 1 addition & 1 deletion examples/read-timeout/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1 @@
/limits
/read-timeout
2 changes: 1 addition & 1 deletion examples/read-timeout/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Demonstrates buffer-request read timeout
Demonstrates read timeout (or "idle" timeout)

go build && ./read-timeout
Binary file removed examples/read-timeout/read-timeout
Binary file not shown.
4 changes: 2 additions & 2 deletions examples/read-timeout/requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func sendRequest(s *gotalk.Sock) {
func timeoutRequest(port string) {
s, err := gotalk.Connect("tcp", "localhost:"+port)
if err != nil { panic(err) }
println("connected to", s.Addr())
println("requestor: connected to", s.Addr())

// Wrap the connection for slow writing to simulate a poor connection
s.Adopt(&slowWriter{s.Conn(), 2 * time.Second})
Expand All @@ -64,7 +64,7 @@ func timeoutRequest(port string) {
func heartbeatKeepAlive(port string) {
s, err := gotalk.Connect("tcp", "localhost:"+port)
if err != nil { panic(err) }
println("connected to", s.Addr())
println("requestor: connected to", s.Addr())

// As the responder has a one second timeout, set our heartbeat interval to half that time
s.HeartbeatInterval = 500 * time.Millisecond
Expand Down
5 changes: 3 additions & 2 deletions examples/read-timeout/responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ func responder(port string) {
}

// Configure limits with a read timeout of one second
s.Limits = gotalk.NewLimits(0, 0, time.Second)
s.Limits = gotalk.NewLimits(0, 0)
s.Limits.SetReadTimeout(time.Second)

// Accept connections
println("listening at", s.Addr())
println("responder: listening at", s.Addr())
go s.Accept()
}
2 changes: 1 addition & 1 deletion examples/websocket-chat/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func onAccept(s *gotalk.Sock) {
defer socksmu.Unlock()
socks[s] = 1

s.CloseHandler = func (s *gotalk.Sock) {
s.CloseHandler = func (s *gotalk.Sock, _ int) {
socksmu.Lock()
defer socksmu.Unlock()
delete(socks, s)
Expand Down
5 changes: 3 additions & 2 deletions examples/websocket/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ func onAccept(s *gotalk.Sock) {
// Send a request & read result via JSON-encoded go values.
greeting := GreetOut{}
if err := s.Request("greet", GreetIn{"Rasmus"}, &greeting); err != nil {
panic("greet request failed: " + err.Error())
fmt.Printf("greet request failed: " + err.Error())
} else {
fmt.Printf("greet: %+v\n", greeting)
}
fmt.Printf("greet: %+v\n", greeting)
}()
}

Expand Down
49 changes: 30 additions & 19 deletions limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (

type Limits interface {
// Maximum amount of time allowed to read a buffer request. 0 = no timeout.
// Defaults to 30 seconds.
ReadTimeout() time.Duration
SetReadTimeout(time.Duration)

incBufferReq() bool
decBufferReq()
Expand All @@ -25,38 +27,40 @@ type Limits interface {
// and we are currently processing 5 streaming requests, we can handle an additional 5 buffered
// requests, but no more streaming requests.
//
// `readTimeout` limits the amount of time that can be spent reading a single buffer request.
//
// - If both `requestLimit` and `streamRequestLimit` is 0, buffer requests are not limited and
// stream requests are disabled.
// - If `streamRequestLimit` is 0, buffer requests are limited to `requestLimit` and stream
// requests are disabled.
// - If `requestLimit` is 0, buffer requests aren't limited, but stream requests are limited
// to `streamRequestLimit`.
//
func NewLimits(requestLimit uint32, streamRequestLimit uint32, readTimeout time.Duration) Limits {
func NewLimits(requestLimit uint32, streamRequestLimit uint32) Limits {
if requestLimit == 0 && streamRequestLimit == 0 {
return noLimitNoStream(readTimeout)
return &noLimitNoStream{DefaultLimits.ReadTimeout()}

} else if requestLimit == 0 {
return &limitStream{limit{streamRequestLimit, 0}, readTimeout}
return &limitStream{limit{streamRequestLimit, 0}, DefaultLimits.ReadTimeout()}

} else if streamRequestLimit == 0 {
return &limitSingleNoStream{limit{requestLimit, 0}, readTimeout}
return &limitSingleNoStream{limit{requestLimit, 0}, DefaultLimits.ReadTimeout()}

} else {
if streamRequestLimit > requestLimit {
panic("streamRequestLimit > requestLimit")
}
return &limitSingleAndStream{limit{requestLimit, 0}, limit{streamRequestLimit, 0}, readTimeout}
return &limitSingleAndStream{
limit{requestLimit, 0},
limit{streamRequestLimit, 0},
DefaultLimits.ReadTimeout(),
}
}
}

// DefaultLimits does not limit buffer requests, and disables stream requests.
var DefaultLimits = NewLimits(0, 0, 30 * time.Second)
var DefaultLimits Limits = &noLimitNoStream{30 * time.Second}

// NoLimits does not limit buffer requests or stream requests, not does it have a read timeout.
var NoLimits = noLimit(false)
var NoLimits Limits = noLimit(false)

// -----------------------------------------------------------------------------------------------

Expand All @@ -67,16 +71,20 @@ func (l noLimit) streamReqEnabled() bool { return true }
func (l noLimit) incStreamReq() bool { return true }
func (l noLimit) decStreamReq() {}
func (l noLimit) ReadTimeout() time.Duration { return 0 }
func (l noLimit) SetReadTimeout(_ time.Duration) {}

// -----------------------------------------------------------------------------------------------

type noLimitNoStream time.Duration
func (l noLimitNoStream) incBufferReq() bool { return true }
func (l noLimitNoStream) decBufferReq() {}
func (l noLimitNoStream) streamReqEnabled() bool { return false }
func (l noLimitNoStream) incStreamReq() bool { return false }
func (l noLimitNoStream) decStreamReq() {}
func (l noLimitNoStream) ReadTimeout() time.Duration { return time.Duration(l) }
type noLimitNoStream struct {
readTimeout time.Duration
}
func (l *noLimitNoStream) incBufferReq() bool { return true }
func (l *noLimitNoStream) decBufferReq() {}
func (l *noLimitNoStream) streamReqEnabled() bool { return false }
func (l *noLimitNoStream) incStreamReq() bool { return false }
func (l *noLimitNoStream) decStreamReq() {}
func (l *noLimitNoStream) ReadTimeout() time.Duration { return l.readTimeout }
func (l *noLimitNoStream) SetReadTimeout(d time.Duration) { l.readTimeout = d }

// -----------------------------------------------------------------------------------------------

Expand All @@ -90,6 +98,7 @@ func (l *limitStream) streamReqEnabled() bool { return true }
func (l *limitStream) incStreamReq() bool { return l.streamLimit.inc() }
func (l *limitStream) decStreamReq() { l.streamLimit.dec() }
func (l *limitStream) ReadTimeout() time.Duration { return l.readTimeout }
func (l *limitStream) SetReadTimeout(d time.Duration) { l.readTimeout = d }

// -----------------------------------------------------------------------------------------------

Expand All @@ -103,6 +112,7 @@ func (l *limitSingleNoStream) streamReqEnabled() bool { return false }
func (l *limitSingleNoStream) incStreamReq() bool { return false }
func (l *limitSingleNoStream) decStreamReq() {}
func (l *limitSingleNoStream) ReadTimeout() time.Duration { return l.readTimeout }
func (l *limitSingleNoStream) SetReadTimeout(d time.Duration) { l.readTimeout = d }

// -----------------------------------------------------------------------------------------------

Expand All @@ -116,10 +126,10 @@ func (l *limitSingleAndStream) decBufferReq() { l.bothLimit.dec() }
func (l *limitSingleAndStream) streamReqEnabled() bool { return true }
func (l *limitSingleAndStream) incStreamReq() bool {
if l.bothLimit.inc() {
if l.streamLimit.inc() == false {
l.bothLimit.dec()
return false
if l.streamLimit.inc() {
return true
}
l.bothLimit.dec()
}
return false
}
Expand All @@ -128,6 +138,7 @@ func (l *limitSingleAndStream) decStreamReq() {
l.bothLimit.dec()
}
func (l *limitSingleAndStream) ReadTimeout() time.Duration { return l.readTimeout }
func (l *limitSingleAndStream) SetReadTimeout(d time.Duration) { l.readTimeout = d }

// -----------------------------------------------------------------------------------------------

Expand Down
85 changes: 85 additions & 0 deletions limits_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package gotalk
import (
"testing"
"time"
)

func verifyLimitsReadTimeout(t *testing.T, l Limits) {
assertEq(t, l.ReadTimeout(), 30 * time.Second)
l.SetReadTimeout(10 * time.Second)
assertEq(t, l.ReadTimeout(), 10 * time.Second)
}

func TestLimits(t *testing.T) {
l := NoLimits
assertEq(t, l.incBufferReq(), true)
assertEq(t, l.streamReqEnabled(), true)
assertEq(t, l.incStreamReq(), true)
assertEq(t, l.ReadTimeout(), time.Duration(0))
l.SetReadTimeout(10 * time.Second)
assertEq(t, l.ReadTimeout(), time.Duration(0)) // NoLimits is immutable

// noLimitNoStream
l = NewLimits(0, 0)
assertEq(t, l.incBufferReq(), true)
assertEq(t, l.streamReqEnabled(), false)
assertEq(t, l.incStreamReq(), false)
verifyLimitsReadTimeout(t, l)

// limitStream
l = NewLimits(0, 1)
assertEq(t, l.incBufferReq(), true)
assertEq(t, l.streamReqEnabled(), true)
assertEq(t, l.incStreamReq(), true)
assertEq(t, l.incStreamReq(), false)
l.decStreamReq()
assertEq(t, l.incStreamReq(), true)
verifyLimitsReadTimeout(t, l)

// limitSingleNoStream
l = NewLimits(1, 0)
assertEq(t, l.incBufferReq(), true)
assertEq(t, l.incBufferReq(), false)
l.decBufferReq()
assertEq(t, l.incBufferReq(), true)
assertEq(t, l.streamReqEnabled(), false)
assertEq(t, l.incStreamReq(), false)
verifyLimitsReadTimeout(t, l)


// limitSingleAndStream
// Note: incBufferReq effectively decrements both requests and streams
l = NewLimits(2, 2)
lss, ok := l.(*limitSingleAndStream)
if !ok {
t.Fatalf("l.(*limitSingleAndStream) failed\n")
}

assertEq(t, l.incBufferReq(), true)
assertEq(t, l.incBufferReq(), true)
assertEq(t, l.incBufferReq(), false)
l.decBufferReq()
assertEq(t, l.incBufferReq(), true)
l.decBufferReq()
l.decBufferReq()
assertEq(t, lss.bothLimit.count, uint32(0))
assertEq(t, lss.streamLimit.count, uint32(0))

assertEq(t, l.streamReqEnabled(), true)

assertEq(t, l.incStreamReq(), true)
assertEq(t, lss.bothLimit.count, uint32(1))
assertEq(t, lss.streamLimit.count, uint32(1))

assertEq(t, l.incStreamReq(), true)
assertEq(t, l.incStreamReq(), false)
l.decStreamReq()
assertEq(t, l.incStreamReq(), true)
l.decStreamReq()
l.decStreamReq()
assertEq(t, lss.bothLimit.count, uint32(0))
assertEq(t, lss.streamLimit.count, uint32(0))

verifyLimitsReadTimeout(t, l)
}

44 changes: 39 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,36 @@ func NewServer(h *Handlers, limits Limits, l net.Listener) *Server {
}


type tcpKeepAliveListener struct {
*net.TCPListener
}

func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
tc, err := ln.AcceptTCP()
if err != nil {
return
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(30 * time.Second)
return tc, nil
}


// Start a `how` server listening for connections at `addr`. You need to call Accept() on the
// returned socket to start accepting connections.
// The returned server has Handlers=DefaultHandlers and Limits=DefaultLimits.
// returned socket to start accepting connections. `how` and `addr` are passed to `net.Listen()`
// and thus any values accepted by net.Listen are valid.
// The returned server has Handlers=DefaultHandlers and Limits=DefaultLimits set.
func Listen(how, addr string) (*Server, error) {
l, err := net.Listen(how, addr)
if err != nil {
return nil, err
}

if tcpl, ok := l.(*net.TCPListener); ok {
// Wrap TCP listener to enable TCP keep-alive
l = &tcpKeepAliveListener{tcpl}
}

s := NewServer(DefaultHandlers, DefaultLimits, l)

if how == "unix" || how == "unixpacket" {
Expand Down Expand Up @@ -83,10 +104,23 @@ func Serve(how, addr string, acceptHandler SockHandler) error {

// Accept connections. Blocks until Close() is called or an error occurs.
func (s *Server) Accept() error {
var tempDelay time.Duration // how long to sleep on accept failure
for {
c, err := s.listener.Accept()
if err != nil {
return err
c, e := s.listener.Accept()
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
time.Sleep(tempDelay)
continue
}
return e
}
go s.accept(c)
}
Expand Down
16 changes: 14 additions & 2 deletions sock.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,11 @@ func (s *Sock) SendHeartbeat(load float32) error {
}


type netLocalAddressable interface {
LocalAddr() net.Addr
}


// After completing a succesful handshake, call this function to read messages received to this
// socket. Does not return until the socket is closed.
// If HeartbeatInterval > 0 this method also sends automatic heartbeats.
Expand All @@ -622,9 +627,16 @@ func (s *Sock) Read(limits Limits) error {
hasReadDeadline := readTimeout != time.Duration(0)
var rd readDeadline

// Pipes doesn't support deadlines
netaddr, ok := s.conn.(netLocalAddressable)
isPipe := ok && netaddr.LocalAddr().Network() == "pipe"
if hasReadDeadline && isPipe {
hasReadDeadline = false
}

// Start sending heartbeats
var heartbeatStopChan chan bool
if s.HeartbeatInterval > 0 {
if s.HeartbeatInterval > 0 && !isPipe {
if s.HeartbeatInterval < time.Millisecond {
panic("HeartbeatInterval < time.Millisecond")
}
Expand Down Expand Up @@ -656,7 +668,7 @@ func (s *Sock) Read(limits Limits) error {
readTimeoutAt := time.Now().Add(readTimeout)
// fmt.Printf("setting read timeout to %v %v\n", readTimeout, readTimeoutAt)
if err = rd.SetReadDeadline(readTimeoutAt); err != nil {
panic("SetReadDeadline failed")
panic("SetReadDeadline failed: " + err.Error())
}
}
}
Expand Down

0 comments on commit a2d869d

Please sign in to comment.