Skip to content

Commit

Permalink
Merge df36ae7 into 04799c2
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanjordan committed Nov 9, 2016
2 parents 04799c2 + df36ae7 commit 751f67e
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 1 deletion.
4 changes: 4 additions & 0 deletions connection.go
Expand Up @@ -164,6 +164,7 @@ type Connection struct {
events connectionEvents
commonStatsTags map[string]string
relay *Relayer
dispatcher Dispatcher

// outboundHP is the host:port we used to create this outbound connection.
// It may not match remotePeerInfo.HostPort, in which case the connection is
Expand Down Expand Up @@ -275,6 +276,8 @@ func (ch *Channel) newConnection(conn net.Conn, outboundHP string, initialState
framePool = DefaultFramePool
}

dispatcher := NewGoroutineDispatcher(ch.log)

connID := nextConnID.Inc()
log := ch.log.WithFields(LogFields{
{"connID", connID},
Expand Down Expand Up @@ -302,6 +305,7 @@ func (ch *Channel) newConnection(conn net.Conn, outboundHP string, initialState
handler: channelHandler{ch},
events: events,
commonStatsTags: ch.commonStatsTags,
dispatcher: dispatcher,
}
c.log = log
c.inbound.onRemoved = c.checkExchanges
Expand Down
30 changes: 30 additions & 0 deletions dispatch_test.go
@@ -0,0 +1,30 @@
package tchannel

import (
"bytes"
"testing"

"github.com/stretchr/testify/assert"
)

// TestGoroutineDispatcherHandlesPanic tests that the Dispatch method of the goroutineDispatcher
// handles panics within the provided f.
func TestGoroutineDispatcherHandlesPanic(t *testing.T) {
var buf bytes.Buffer
d := NewGoroutineDispatcher(NewLogger(&buf))
call := InboundCall{
serviceName: "sampleService",
methodString: "sampleMethod",
headers: transportHeaders{
CallerName: "sampleCaller",
},
}
d.Dispatch(&call, func() {
panic("panicString")
})
bufStr := buf.String()
assert.Contains(t, bufStr, "panicString")
assert.Contains(t, bufStr, "sampleService")
assert.Contains(t, bufStr, "sampleMethod")
assert.Contains(t, bufStr, "sampleCaller")
}
64 changes: 64 additions & 0 deletions dispatcher.go
@@ -0,0 +1,64 @@
package tchannel

import (
"fmt"
"runtime"
)

// Dispatcher is an interface that dispatches a request to a handler
type Dispatcher interface {
Dispatch(call *InboundCall, f func())
}

// recoverFn handles the dispatched call if it panics
func recoverFn(logger Logger, call *InboundCall) {
if r := recover(); r != nil {
// log panic
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
panicStr := fmt.Sprintf("%v", r)
log := logger.WithFields(
LogField{"panic", panicStr},
LogField{"serviceName", call.ServiceName()},
LogField{"method", call.MethodString()},
LogField{"callerName", call.CallerName()},
LogField{"stack", buf},
)
log.Error("request handler panicked")

sendError(call)
}
}

// sendError returns a system error to the client with some info about the panic
func sendError(call *InboundCall) {
response := call.Response()
err := NewSystemError(
ErrCodeUnexpected,
"request handler %q:%q panicked while processing request",
call.ServiceName(),
call.Method(),
)
response.SendSystemError(err)
}

// goroutineDispatcher is a dispatcher that uses a separate goroutine for each request with no limits
type goroutineDispatcher struct {
logger Logger
}

// NewGoroutineDispatcher creates a new dispatcher that uses a new goroutine for each request
func NewGoroutineDispatcher(logger Logger) Dispatcher {
return &goroutineDispatcher{
logger: logger,
}
}

// Dispatch calls the provided function in a separate goroutine for each call
func (d *goroutineDispatcher) Dispatch(call *InboundCall, f func()) {
go func() {
defer recoverFn(d.logger, call)
f()
}()
}
4 changes: 3 additions & 1 deletion inbound.go
Expand Up @@ -132,7 +132,9 @@ func (c *Connection) handleCallReq(frame *Frame) bool {
response.commonStatsTags = call.commonStatsTags

setResponseHeaders(call.headers, response.headers)
go c.dispatchInbound(c.connID, callReq.ID(), call, frame)
c.dispatcher.Dispatch(call, func() {
c.dispatchInbound(c.connID, callReq.ID(), call, frame)
})
return false
}

Expand Down

0 comments on commit 751f67e

Please sign in to comment.