Skip to content

Commit

Permalink
Merge ca85350 into 108e5f0
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielcorado committed Jan 5, 2021
2 parents 108e5f0 + ca85350 commit 389ca7a
Show file tree
Hide file tree
Showing 14 changed files with 144 additions and 63 deletions.
4 changes: 4 additions & 0 deletions acceptor/tcp_acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (t *tcpPlayerConn) GetNextMessage() (b []byte, err error) {
if err != nil {
return nil, err
}
// if the header has no data, we can consider it as a closed connection
if len(header) == 0 {
return nil, constants.ErrConnectionClosed
}
msgSize, _, err := codec.ParseHeader(header)
if err != nil {
return nil, err
Expand Down
23 changes: 23 additions & 0 deletions acceptor/tcp_acceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,30 @@ func TestGetNextMessageEOF(t *testing.T) {

_, err = playerConn.GetNextMessage()
assert.EqualError(t, err, constants.ErrReceivedMsgSmallerThanExpected.Error())
}

func TestGetNextMessageEmptyEOF(t *testing.T) {
a := NewTCPAcceptor("0.0.0.0:0")
go a.ListenAndServe()
defer a.Stop()
c := a.GetConnChan()
// should be able to connect within 100 milliseconds
var conn net.Conn
var err error
helpers.ShouldEventuallyReturn(t, func() error {
conn, err = net.Dial("tcp", a.GetAddr())
return err
}, nil, 10*time.Millisecond, 100*time.Millisecond)

playerConn := helpers.ShouldEventuallyReceive(t, c, 100*time.Millisecond).(PlayerConn)

go func() {
time.Sleep(100 * time.Millisecond)
conn.Close()
}()

_, err = playerConn.GetNextMessage()
assert.EqualError(t, err, constants.ErrConnectionClosed.Error())
}

func TestGetNextMessageInParts(t *testing.T) {
Expand Down
32 changes: 20 additions & 12 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ func (a *agentImpl) send(pendingMsg pendingMessage) (err error) {
pWrite.err = util.GetErrorFromPayload(a.serializer, m.Data)
}

a.chSend <- pWrite
// chSend is never closed so we need this to don't block if agent is already closed
select {
case a.chSend <- pWrite:
case <-a.chDie:
}
return
}

Expand All @@ -288,10 +292,10 @@ func (a *agentImpl) Push(route string, v interface{}) error {

switch d := v.(type) {
case []byte:
logger.Log.Debugf("Type=Push, ID=%d, UID=%d, Route=%s, Data=%dbytes",
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%dbytes",
a.Session.ID(), a.Session.UID(), route, len(d))
default:
logger.Log.Debugf("Type=Push, ID=%d, UID=%d, Route=%s, Data=%+v",
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%+v",
a.Session.ID(), a.Session.UID(), route, v)
}
return a.send(pendingMessage{typ: message.Push, route: route, payload: v})
Expand All @@ -314,10 +318,10 @@ func (a *agentImpl) ResponseMID(ctx context.Context, mid uint, v interface{}, is

switch d := v.(type) {
case []byte:
logger.Log.Debugf("Type=Response, ID=%d, UID=%d, MID=%d, Data=%dbytes",
logger.Log.Debugf("Type=Response, ID=%d, UID=%s, MID=%d, Data=%dbytes",
a.Session.ID(), a.Session.UID(), mid, len(d))
default:
logger.Log.Infof("Type=Response, ID=%d, UID=%d, MID=%d, Data=%+v",
logger.Log.Infof("Type=Response, ID=%d, UID=%s, MID=%d, Data=%+v",
a.Session.ID(), a.Session.UID(), mid, v)
}

Expand Down Expand Up @@ -394,15 +398,12 @@ func (a *agentImpl) SetStatus(state int32) {
func (a *agentImpl) Handle() {
defer func() {
a.Close()
logger.Log.Debugf("Session handle goroutine exit, SessionID=%d, UID=%d", a.Session.ID(), a.Session.UID())
logger.Log.Debugf("Session handle goroutine exit, SessionID=%d, UID=%s", a.Session.ID(), a.Session.UID())
}()

go a.write()
go a.heartbeat()
select {
case <-a.chDie: // agent closed signal
return
}
<-a.chDie // agent closed signal
}

// IPVersion returns the remote address ip version.
Expand Down Expand Up @@ -438,7 +439,15 @@ func (a *agentImpl) heartbeat() {
logger.Log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&a.lastAt), deadline)
return
}
a.chSend <- pendingWrite{data: hbd}

// chSend is never closed so we need this to don't block if agent is already closed
select {
case a.chSend <- pendingWrite{data: hbd}:
case <-a.chDie:
return
case <-a.chStopHeartbeat:
return
}
case <-a.chDie:
return
case <-a.chStopHeartbeat:
Expand Down Expand Up @@ -472,7 +481,6 @@ func (a *agentImpl) SendHandshakeResponse() error {
func (a *agentImpl) write() {
// clean func
defer func() {
close(a.chSend)
a.Close()
}()

Expand Down
4 changes: 2 additions & 2 deletions agent/agent_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ func (a *Remote) Push(route string, v interface{}) error {
}
switch d := v.(type) {
case []byte:
logger.Log.Debugf("Type=Push, ID=%d, UID=%d, Route=%s, Data=%dbytes",
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%dbytes",
a.Session.ID(), a.Session.UID(), route, len(d))
default:
logger.Log.Debugf("Type=Push, ID=%d, UID=%d, Route=%s, Data=%+v",
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%+v",
a.Session.ID(), a.Session.UID(), route, v)
}

Expand Down
8 changes: 7 additions & 1 deletion cluster/etcd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,13 @@ func (sd *etcdServiceDiscovery) watchEtcdChanges() {
go func(chn clientv3.WatchChan) {
for sd.running {
select {
case wResp := <-chn:
case wResp, ok := <-chn:
if wResp.Err() != nil {
logger.Log.Warnf("etcd watcher response error: %s", wResp.Err())
}
if !ok {
logger.Log.Error("etcd watcher died")
}
for _, ev := range wResp.Events {
svType, svID, err := parseEtcdKey(string(ev.Kv.Key))
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions constants/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,5 @@ var (
ErrRateLimitExceeded = errors.New("rate limit exceeded")
ErrReceivedMsgSmallerThanExpected = errors.New("received less data than expected, EOF?")
ErrReceivedMsgBiggerThanExpected = errors.New("received more data than expected")
ErrConnectionClosed = errors.New("client connection closed")
)
25 changes: 22 additions & 3 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,28 @@ This module implements functionality needed by the gRPC RPC implementation to en

Pitaya has support for metrics reporting, it comes with Prometheus and Statsd support already implemented and has support for custom reporters that implement the `Reporter` interface. Pitaya also comes with support for open tracing compatible frameworks, allowing the easy integration of Jaeger and others.

Some of the reported metrics reported by the `Reporter` include: number of connected clients, request duration and dropped messages.

## Custom Metrics
The list of metrics reported by the `Reporter` is:

- Response time: the time to process a message, in nanoseconds. It is segmented
by route, status, server type and response code;
- Process delay time: the delay to start processing a message, in nanoseconds;
It is segmented by route and server type;
- Exceeded Rate Limit: the number of blocked requests by exceeded rate limiting;
- Connected clients: number of clients connected at the moment;
- Server count: the number of discovered servers by service discovery. It is
segmented by server type;
- Channel capacity: the available capacity of the channel;
- Dropped messages: the number of rpc server dropped messages, that is, messages that are not handled;
- Goroutines count: the current number Goroutines;
- Heap size: the current heap size;
- Heap objects count: the current number of objects at the heap;
- Worker jobs retry: the current amount of RPC reliability worker job retries;
- Worker jobs total: the current amount of RPC reliability worker jobs. It is
segmented by job status;
- Worker queue size: the current size of RPC reliability worker job queues. It
is segmented by each available queue.

### Custom Metrics

Besides pitaya default monitoring, it is possible to create new metrics. If using only Statsd reporter, no configuration is needed. If using Prometheus, it is necessary do add a configuration specifying the metrics parameters. More details on [doc](configuration.html#metrics-reporting) and this [example](https://github.com/topfreegames/pitaya/tree/master/examples/demo/custom_metrics).

Expand Down
56 changes: 28 additions & 28 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ func TestHandlerCallToFront(t *testing.T) {
tables := []struct {
req string
data []byte
resp []byte
resp string
}{
{"connector.testsvc.testrequestonlysessionreturnsptr", []byte(``), []byte(`{"code":200,"msg":"hello"}`)},
{"connector.testsvc.testrequestonlysessionreturnsptrnil", []byte(``), []byte(`{"code":"PIT-000","msg":"reply must not be null"}`)},
{"connector.testsvc.testrequestonlysessionreturnsrawnil", []byte(``), []byte(`{"code":"PIT-000","msg":"reply must not be null"}`)},
{"connector.testsvc.testrequestreturnsptr", []byte(`{"msg":"good"}`), []byte(`{"code":200,"msg":"good"}`)},
{"connector.testsvc.testrequestreturnsraw", []byte(`{"msg":"good"}`), []byte(`good`)},
{"connector.testsvc.testrequestreceivereturnsraw", []byte(`woow`), []byte(`woow`)},
{"connector.testsvc.nonexistenthandler", []byte(`woow`), []byte(`{"code":"PIT-404","msg":"pitaya/handler: connector.testsvc.nonexistenthandler not found"}`)},
{"connector.testsvc.testrequestreturnserror", []byte(`woow`), []byte(`{"code":"PIT-555","msg":"somerror"}`)},
{"connector.testsvc.testrequestonlysessionreturnsptr", []byte(``), `{"code":200,"msg":"hello"}`},
{"connector.testsvc.testrequestonlysessionreturnsptrnil", []byte(``), `{"code":"PIT-000","msg":"reply must not be null"}`},
{"connector.testsvc.testrequestonlysessionreturnsrawnil", []byte(``), `{"code":"PIT-000","msg":"reply must not be null"}`},
{"connector.testsvc.testrequestreturnsptr", []byte(`{"msg":"good"}`), `{"code":200,"msg":"good"}`},
{"connector.testsvc.testrequestreturnsraw", []byte(`{"msg":"good"}`), `good`},
{"connector.testsvc.testrequestreceivereturnsraw", []byte(`woow`), `woow`},
{"connector.testsvc.nonexistenthandler", []byte(`woow`), `{"code":"PIT-404","msg":"pitaya/handler: connector.testsvc.nonexistenthandler not found"}`},
{"connector.testsvc.testrequestreturnserror", []byte(`woow`), `{"code":"PIT-555","msg":"somerror"}`},
}
port := helpers.GetFreePort(t)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
Expand All @@ -84,7 +84,7 @@ func TestHandlerCallToFront(t *testing.T) {

msg := helpers.ShouldEventuallyReceive(t, c.IncomingMsgChan).(*message.Message)
assert.Equal(t, message.Response, msg.Type)
assert.Equal(t, table.resp, msg.Data)
assert.Regexp(t, table.resp, string(msg.Data))
})
}
}
Expand Down Expand Up @@ -356,14 +356,14 @@ func TestForwardToBackend(t *testing.T) {
tables := []struct {
req string
data []byte
resp []byte
resp string
}{
{"game.testsvc.testrequestonlysessionreturnsptr", []byte(``), []byte(`{"code":200,"msg":"hello"}`)},
{"game.testsvc.testrequestreturnsptr", []byte(`{"msg":"good"}`), []byte(`{"code":200,"msg":"good"}`)},
{"game.testsvc.testrequestreturnsraw", []byte(`{"msg":"good"}`), []byte(`good`)},
{"game.testsvc.testrequestreceivereturnsraw", []byte(`woow`), []byte(`woow`)},
{"game.testsvc.nonexistenthandler", []byte(`woow`), []byte(`{"code":"PIT-404","msg":"pitaya/handler: game.testsvc.nonexistenthandler not found"}`)},
{"game.testsvc.testrequestreturnserror", []byte(`woow`), []byte(`{"code":"PIT-555","msg":"somerror"}`)},
{"game.testsvc.testrequestonlysessionreturnsptr", []byte(``), `{"code":200,"msg":"hello"}`},
{"game.testsvc.testrequestreturnsptr", []byte(`{"msg":"good"}`), `{"code":200,"msg":"good"}`},
{"game.testsvc.testrequestreturnsraw", []byte(`{"msg":"good"}`), `good`},
{"game.testsvc.testrequestreceivereturnsraw", []byte(`woow`), `woow`},
{"game.testsvc.nonexistenthandler", []byte(`woow`), `{"code":"PIT-404","msg":".+pitaya/handler: game.testsvc.nonexistenthandler not found"}`},
{"game.testsvc.testrequestreturnserror", []byte(`woow`), `{"code":"PIT-555","msg":".+somerror"}`},
}

c := client.New(logrus.InfoLevel)
Expand All @@ -379,7 +379,7 @@ func TestForwardToBackend(t *testing.T) {

msg := helpers.ShouldEventuallyReceive(t, c.IncomingMsgChan).(*message.Message)
assert.Equal(t, message.Response, msg.Type)
assert.Equal(t, table.resp, msg.Data)
assert.Regexp(t, table.resp, string(msg.Data))
})
}
}
Expand Down Expand Up @@ -452,24 +452,24 @@ func TestUserRPC(t *testing.T) {
name string
route string
data []byte
res []byte
res string
}{
{"front_to_back", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), []byte(`{"code":200,"msg":"got thisthis"}`)},
{"back_to_front", "game.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), []byte(`{"code":200,"msg":"got thisthis"}`)},
{"front_to_back_error", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestreturnserror","data":"thisthis"}`), []byte(`{"code":"PIT-433","msg":"test error","metadata":{"some":"meta"}}`)},
{"back_to_front_error", "game.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestreturnserror","data":"thisthis"}`), []byte(`{"code":"PIT-433","msg":"test error","metadata":{"some":"meta"}}`)},
{"same_server", "connector.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), []byte(`{"code":"PIT-000","msg":"you are making a rpc that may be processed locally, either specify a different server type or specify a server id"}`)},
{"front_to_back_ptr", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestptrreturnsptr","data":"thisthis"}`), []byte(`{"code":200,"msg":"got thisthis"}`)},
{"no_args", "connector.testsvc.testsendrpcnoargs", []byte(`{"route":"game.testremotesvc.rpctestnoargs"}`), []byte(`{"code":200,"msg":"got nothing"}`)},
{"not_found", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestnotfound","data":"thisthis"}`), []byte(`{"code":"PIT-404","msg":"route not found","metadata":{"route":"testremotesvc.rpctestnotfound"}}`)},
{"front_to_back", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), `{"code":200,"msg":"got thisthis"}`},
{"back_to_front", "game.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), `{"code":200,"msg":"got thisthis"}`},
{"front_to_back_error", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestreturnserror","data":"thisthis"}`), `{"code":"PIT-433","msg":".+test error","metadata":{"some":"meta"}}`},
{"back_to_front_error", "game.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestreturnserror","data":"thisthis"}`), `{"code":"PIT-433","msg":".+test error","metadata":{"some":"meta"}}`},
{"same_server", "connector.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), `{"code":"PIT-000","msg":"you are making a rpc that may be processed locally, either specify a different server type or specify a server id"}`},
{"front_to_back_ptr", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestptrreturnsptr","data":"thisthis"}`), `{"code":200,"msg":"got thisthis"}`},
{"no_args", "connector.testsvc.testsendrpcnoargs", []byte(`{"route":"game.testremotesvc.rpctestnoargs"}`), `{"code":200,"msg":"got nothing"}`},
{"not_found", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestnotfound","data":"thisthis"}`), `{"code":"PIT-404","msg":".+route not found","metadata":{"route":"testremotesvc.rpctestnotfound"}}`},
}

for _, table := range tables {
t.Run(table.name, func(t *testing.T) {
_, err := c1.SendRequest(table.route, table.data)
assert.NoError(t, err)
msg := helpers.ShouldEventuallyReceive(t, c1.IncomingMsgChan).(*message.Message)
assert.Equal(t, table.res, msg.Data)
assert.Regexp(t, table.res, string(msg.Data))
})
}
}
6 changes: 3 additions & 3 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (app *App) GroupBroadcast(ctx context.Context, frontendType, groupName, rou
func (app *App) sendDataToMembers(uids []string, frontendType, route string, v interface{}) error {
errUids, err := app.SendPushToUsers(route, v, uids, frontendType)
if err != nil {
logger.Log.Errorf("Group push message error, UID=%d, Error=%s", errUids, err.Error())
logger.Log.Errorf("Group push message error, UID=%v, Error=%s", errUids, err.Error())
return err
}
return nil
Expand All @@ -79,13 +79,13 @@ func (app *App) GroupAddMember(ctx context.Context, groupName, uid string) error
if uid == "" {
return constants.ErrEmptyUID
}
logger.Log.Debugf("Add user to group %s, UID=%d", groupName, uid)
logger.Log.Debugf("Add user to group %s, UID=%s", groupName, uid)
return app.groups.GroupAddMember(ctx, groupName, uid)
}

// GroupRemoveMember removes specified UID from group
func (app *App) GroupRemoveMember(ctx context.Context, groupName, uid string) error {
logger.Log.Debugf("Remove user from group %s, UID=%d", groupName, uid)
logger.Log.Debugf("Remove user from group %s, UID=%s", groupName, uid)
return app.groups.GroupRemoveMember(ctx, groupName, uid)
}

Expand Down
4 changes: 2 additions & 2 deletions kick.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func (app *App) SendKickToUsers(uids []string, frontendType string) ([]string, e
if s := app.sessionPool.GetSessionByUID(uid); s != nil {
if err := s.Kick(context.Background()); err != nil {
notKickedUids = append(notKickedUids, uid)
logger.Log.Errorf("Session kick error, ID=%d, UID=%d, ERROR=%s", s.ID(), s.UID(), err.Error())
logger.Log.Errorf("Session kick error, ID=%d, UID=%s, ERROR=%s", s.ID(), s.UID(), err.Error())
}
} else if app.rpcClient != nil {
kick := &protos.KickMsg{UserId: uid}
if err := app.rpcClient.SendKick(uid, frontendType, kick); err != nil {
notKickedUids = append(notKickedUids, uid)
logger.Log.Errorf("RPCClient send kick error, UID=%d, SvType=%s, Error=%s", uid, frontendType, err.Error())
logger.Log.Errorf("RPCClient send kick error, UID=%s, SvType=%s, Error=%s", uid, frontendType, err.Error())
}
} else {
notKickedUids = append(notKickedUids, uid)
Expand Down
2 changes: 1 addition & 1 deletion push.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (app *App) SendPushToUsers(route string, v interface{}, uids []string, fron
if s := app.sessionPool.GetSessionByUID(uid); s != nil && app.server.Type == frontendType {
if err := s.Push(route, data); err != nil {
notPushedUids = append(notPushedUids, uid)
logger.Log.Errorf("Session push message error, ID=%d, UID=%d, Error=%s",
logger.Log.Errorf("Session push message error, ID=%d, UID=%s, Error=%s",
s.ID(), s.UID(), err.Error())
}
} else if app.rpcClient != nil {
Expand Down
7 changes: 5 additions & 2 deletions service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,17 @@ func (h *HandlerService) Handle(conn acceptor.PlayerConn) {
// guarantee agent related resource is destroyed
defer func() {
a.GetSession().Close()
logger.Log.Debugf("Session read goroutine exit, SessionID=%d, UID=%d", a.GetSession().ID(), a.GetSession().UID())
logger.Log.Debugf("Session read goroutine exit, SessionID=%d, UID=%s", a.GetSession().ID(), a.GetSession().UID())
}()

for {
msg, err := conn.GetNextMessage()

if err != nil {
logger.Log.Errorf("Error reading next available message: %s", err.Error())
if err != constants.ErrConnectionClosed {
logger.Log.Errorf("Error reading next available message: %s", err.Error())
}

return
}

Expand Down

0 comments on commit 389ca7a

Please sign in to comment.