Skip to content

Commit

Permalink
Added per-instance error logger to Client and Server
Browse files Browse the repository at this point in the history
  • Loading branch information
vl committed Jun 5, 2015
1 parent c89231f commit 22c54a7
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 25 deletions.
21 changes: 15 additions & 6 deletions client.go
Expand Up @@ -92,6 +92,11 @@ type Client struct {
// By default it returns TCP connections established to the Client.Addr.
Dial DialFunc

// LogError is used for error logging.
//
// By default the function set via SetErrorLogger() is used.
LogError LoggerFunc

// Connection statistics.
//
// The stats doesn't reset automatically. Feel free resetting it
Expand All @@ -111,6 +116,9 @@ type Client struct {
// There is no need in registering base Go types such as int, string, bool,
// float64, etc. or arrays, slices and maps containing base Go types.
func (c *Client) Start() {
if c.LogError == nil {
c.LogError = errorLogger
}
if c.clientStopChan != nil {
panic("gorpc.Client: the given client is already started. Call Client.Stop() before calling Client.Start() again!")
}
Expand Down Expand Up @@ -140,6 +148,7 @@ func (c *Client) Start() {
if c.Dial == nil {
c.Dial = defaultDial
}

for i := 0; i < c.Conns; i++ {
c.stopWg.Add(1)
go clientHandler(c)
Expand Down Expand Up @@ -211,7 +220,7 @@ func (c *Client) CallTimeout(request interface{}, timeout time.Duration) (respon

func getClientTimeoutError(c *Client, timeout time.Duration) error {
err := fmt.Errorf("gorpc.Client: [%s]. Cannot obtain response during timeout=%s", c.Addr, timeout)
logError("%s", err)
c.LogError("%s", err)
return &ClientError{
Timeout: true,
err: err,
Expand Down Expand Up @@ -289,7 +298,7 @@ func (c *Client) callAsync(request interface{}, skipResponse bool) (ar *AsyncRes
return m, nil
default:
err = fmt.Errorf("gorpc.Client: [%s]. Requests' queue with size=%d is overflown. Try increasing Client.PendingRequests value", c.Addr, cap(c.requestsChan))
logError("%s", err)
c.LogError("%s", err)
err = &ClientError{
Overflow: true,
err: err,
Expand Down Expand Up @@ -499,7 +508,7 @@ func clientHandler(c *Client) {
dialChan := make(chan struct{})
go func() {
if conn, err = c.Dial(c.Addr); err != nil {
logError("gorpc.Client: [%s]. Cannot establish rpc connection: [%s]", c.Addr, err)
c.LogError("gorpc.Client: [%s]. Cannot establish rpc connection: [%s]", c.Addr, err)
time.Sleep(time.Second)
}
close(dialChan)
Expand All @@ -524,7 +533,7 @@ func clientHandleConnection(c *Client, conn io.ReadWriteCloser) {
if c.OnConnect != nil {
newConn, err := c.OnConnect(c.Addr, conn)
if err != nil {
logError("gorpc.Client: [%s]. OnConnect error: [%s]", c.Addr, err)
c.LogError("gorpc.Client: [%s]. OnConnect error: [%s]", c.Addr, err)
conn.Close()
return
}
Expand All @@ -537,7 +546,7 @@ func clientHandleConnection(c *Client, conn io.ReadWriteCloser) {
}
_, err := conn.Write(buf[:])
if err != nil {
logError("gorpc.Client: [%s]. Error when writing handshake to server: [%s]", c.Addr, err)
c.LogError("gorpc.Client: [%s]. Error when writing handshake to server: [%s]", c.Addr, err)
conn.Close()
return
}
Expand Down Expand Up @@ -570,7 +579,7 @@ func clientHandleConnection(c *Client, conn io.ReadWriteCloser) {
}

if err != nil {
logError("%s", err)
c.LogError("%s", err)
err = &ClientError{
Connection: true,
err: err,
Expand Down
8 changes: 5 additions & 3 deletions common.go
Expand Up @@ -55,9 +55,11 @@ func SetErrorLogger(f LoggerFunc) {
errorLogger = f
}

func logError(format string, args ...interface{}) {
errorLogger(format, args...)
}
// NilErrorLogger discards all error messages.
//
// Pass NilErrorLogger to SetErrorLogger() in order to suppress error log generated
// by gorpc.
func NilErrorLogger(format string, args ...interface{}) {}

func logPanic(format string, args ...interface{}) {
errorLogger(format, args...)
Expand Down
2 changes: 1 addition & 1 deletion rpc_test.go
Expand Up @@ -13,7 +13,7 @@ import (
)

func init() {
SetErrorLogger(func(format string, args ...interface{}) {})
SetErrorLogger(NilErrorLogger)
}

func echoHandler(clientAddr string, request interface{}) interface{} {
Expand Down
38 changes: 23 additions & 15 deletions server.go
Expand Up @@ -93,6 +93,11 @@ type Server struct {
// By default it returns TCP connections accepted from Server.Addr.
Listener Listener

// LogError is used for error logging.
//
// By default the function set via SetErrorLogger() is used.
LogError LoggerFunc

// Connection statistics.
//
// The stats doesn't reset automatically. Feel free resetting it
Expand All @@ -110,6 +115,9 @@ type Server struct {
// There is no need in registering base Go types such as int, string, bool,
// float64, etc. or arrays, slices and maps containing base Go types.
func (s *Server) Start() error {
if s.LogError == nil {
s.LogError = errorLogger
}
if s.Handler == nil {
panic("gorpc.Server: Server.Handler cannot be nil")
}
Expand Down Expand Up @@ -140,7 +148,7 @@ func (s *Server) Start() error {
}
if err := s.Listener.Init(s.Addr); err != nil {
err = fmt.Errorf("gorpc.Server: [%s]. Cannot listen to: [%s]", s.Addr, err)
logError("%s", err)
s.LogError("%s", err)
return err
}

Expand Down Expand Up @@ -180,7 +188,7 @@ func serverHandler(s *Server, workersCh chan struct{}) {
acceptChan := make(chan struct{})
go func() {
if conn, clientAddr, err = s.Listener.Accept(); err != nil {
logError("gorpc.Server: [%s]. Cannot accept new connection: [%s]", s.Addr, err)
s.LogError("gorpc.Server: [%s]. Cannot accept new connection: [%s]", s.Addr, err)
time.Sleep(time.Second)
}
close(acceptChan)
Expand Down Expand Up @@ -210,7 +218,7 @@ func serverHandleConnection(s *Server, conn io.ReadWriteCloser, clientAddr strin
if s.OnConnect != nil {
newConn, err := s.OnConnect(clientAddr, conn)
if err != nil {
logError("gorpc.Server: [%s]->[%s]. OnConnect error: [%s]", clientAddr, s.Addr, err)
s.LogError("gorpc.Server: [%s]->[%s]. OnConnect error: [%s]", clientAddr, s.Addr, err)
conn.Close()
return
}
Expand All @@ -223,7 +231,7 @@ func serverHandleConnection(s *Server, conn io.ReadWriteCloser, clientAddr strin
go func() {
var buf [1]byte
if _, err = conn.Read(buf[:]); err != nil {
logError("gorpc.Server: [%s]->[%s]. Error when reading handshake from client: [%s]", clientAddr, s.Addr, err)
s.LogError("gorpc.Server: [%s]->[%s]. Error when reading handshake from client: [%s]", clientAddr, s.Addr, err)
}
zChan <- (buf[0] != 0)
}()
Expand All @@ -237,7 +245,7 @@ func serverHandleConnection(s *Server, conn io.ReadWriteCloser, clientAddr strin
conn.Close()
return
case <-time.After(10 * time.Second):
logError("gorpc.Server: [%s]->[%s]. Cannot obtain handshake from client during 10s", clientAddr, s.Addr)
s.LogError("gorpc.Server: [%s]->[%s]. Cannot obtain handshake from client during 10s", clientAddr, s.Addr)
conn.Close()
return
}
Expand Down Expand Up @@ -287,7 +295,7 @@ func serverReader(s *Server, r io.Reader, clientAddr string, responsesChan chan<

defer func() {
if r := recover(); r != nil {
logError("gorpc.Server: [%s]->[%s]. Panic when reading data from client: %v", clientAddr, s.Addr, r)
s.LogError("gorpc.Server: [%s]->[%s]. Panic when reading data from client: %v", clientAddr, s.Addr, r)
}
close(done)
}()
Expand All @@ -298,7 +306,7 @@ func serverReader(s *Server, r io.Reader, clientAddr string, responsesChan chan<
var wr wireRequest
for {
if err := d.Decode(&wr); err != nil {
logError("gorpc.Server: [%s]->[%s]. Cannot decode request: [%s]", clientAddr, s.Addr, err)
s.LogError("gorpc.Server: [%s]->[%s]. Cannot decode request: [%s]", clientAddr, s.Addr, err)
return
}

Expand All @@ -319,11 +327,11 @@ func serverReader(s *Server, r io.Reader, clientAddr string, responsesChan chan<
return
}
}
go serveRequest(s.Handler, s.Addr, &s.Stats, responsesChan, stopChan, m, workersCh)
go serveRequest(s, responsesChan, stopChan, m, workersCh)
}
}

func serveRequest(handler HandlerFunc, serverAddr string, stats *ConnStats, responsesChan chan<- *serverMessage, stopChan <-chan struct{}, m *serverMessage, workersCh <-chan struct{}) {
func serveRequest(s *Server, responsesChan chan<- *serverMessage, stopChan <-chan struct{}, m *serverMessage, workersCh <-chan struct{}) {
request := m.Request
m.Request = nil
clientAddr := m.ClientAddr
Expand All @@ -337,8 +345,8 @@ func serveRequest(handler HandlerFunc, serverAddr string, stats *ConnStats, resp
}

t := time.Now()
response, err := callHandlerWithRecover(handler, clientAddr, serverAddr, request)
stats.incRPCTime(uint64(time.Since(t).Seconds() * 1000))
response, err := callHandlerWithRecover(s.LogError, s.Handler, clientAddr, s.Addr, request)
s.Stats.incRPCTime(uint64(time.Since(t).Seconds() * 1000))

if !skipResponse {
m.Response = response
Expand All @@ -359,13 +367,13 @@ func serveRequest(handler HandlerFunc, serverAddr string, stats *ConnStats, resp
<-workersCh
}

func callHandlerWithRecover(handler HandlerFunc, clientAddr, serverAddr string, request interface{}) (response interface{}, errStr string) {
func callHandlerWithRecover(logErrorFunc LoggerFunc, handler HandlerFunc, clientAddr, serverAddr string, request interface{}) (response interface{}, errStr string) {
defer func() {
if x := recover(); x != nil {
stackTrace := make([]byte, 1<<20)
n := runtime.Stack(stackTrace, false)
errStr = fmt.Sprintf("Panic occured: %v\nStack trace: %s", x, stackTrace[:n])
logError("gorpc.Server: [%s]->[%s]. %s", clientAddr, serverAddr, errStr)
logErrorFunc("gorpc.Server: [%s]->[%s]. %s", clientAddr, serverAddr, errStr)
}
}()
response = handler(clientAddr, request)
Expand Down Expand Up @@ -393,7 +401,7 @@ func serverWriter(s *Server, w io.Writer, clientAddr string, responsesChan <-cha
case m = <-responsesChan:
case <-flushChan:
if err := e.Flush(); err != nil {
logError("gorpc.Server: [%s]->[%s]: Cannot flush responses to underlying stream: [%s]", clientAddr, s.Addr, err)
s.LogError("gorpc.Server: [%s]->[%s]: Cannot flush responses to underlying stream: [%s]", clientAddr, s.Addr, err)
return
}
flushChan = nil
Expand All @@ -414,7 +422,7 @@ func serverWriter(s *Server, w io.Writer, clientAddr string, responsesChan <-cha
serverMessagePool.Put(m)

if err := e.Encode(wr); err != nil {
logError("gorpc.Server: [%s]->[%s]. Cannot send response to wire: [%s]", clientAddr, s.Addr, err)
s.LogError("gorpc.Server: [%s]->[%s]. Cannot send response to wire: [%s]", clientAddr, s.Addr, err)
return
}
wr.Response = nil
Expand Down

0 comments on commit 22c54a7

Please sign in to comment.