Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
zengjiwen committed Aug 1, 2021
2 parents ad970f0 + 10789d3 commit 6fb530e
Show file tree
Hide file tree
Showing 19 changed files with 920 additions and 348 deletions.
3 changes: 3 additions & 0 deletions Makefile
Expand Up @@ -50,6 +50,9 @@ run-custom-metrics-example:
run-rate-limiting-example:
@go run examples/demo/rate_limiting/main.go

protos-compile-demo:
@protoc -I examples/demo/protos examples/demo/protos/*.proto --go_out=.

protos-compile:
@cd benchmark/testdata && ./gen_proto.sh
@protoc -I pitaya-protos/ pitaya-protos/*.proto --go_out=plugins=grpc:protos
Expand Down
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -6,8 +6,8 @@
[4]: https://goreportcard.com/report/github.com/topfreegames/pitaya
[5]: https://img.shields.io/badge/license-MIT-blue.svg
[6]: LICENSE
[7]: https://travis-ci.org/topfreegames/pitaya.svg?branch=master
[8]: https://travis-ci.org/topfreegames/pitaya
[7]: https://github.com/topfreegames/pitaya/actions/workflows/tests.yaml/badge.svg
[8]: https://github.com/topfreegames/pitaya/actions/workflows/tests.yaml
[9]: https://coveralls.io/repos/github/topfreegames/pitaya/badge.svg?branch=master
[10]: https://coveralls.io/github/topfreegames/pitaya?branch=master
[11]: https://readthedocs.org/projects/pitaya/badge/?version=latest
Expand Down
3 changes: 3 additions & 0 deletions acceptor/tcp_acceptor.go
Expand Up @@ -138,6 +138,9 @@ func (a *TCPAcceptor) ListenAndServeTLS(cert, key string) {
tlsCfg := &tls.Config{Certificates: []tls.Certificate{crt}}

listener, err := tls.Listen("tcp", a.addr, tlsCfg)
if err != nil {
logger.Log.Fatalf("Failed to listen: %s", err.Error())
}
a.listener = listener
a.running = true
a.serve()
Expand Down
31 changes: 18 additions & 13 deletions client/protoclient.go
Expand Up @@ -26,6 +26,7 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"strings"
"time"
Expand Down Expand Up @@ -144,6 +145,11 @@ func getOutputInputNames(command map[string]interface{}) (string, string, error)

out := command["output"]
outputDocsArr := out.([]interface{})
// we can have handlers that have no return specified.
if len(outputDocsArr) == 0 {
return inputName, "", nil
}

outputDocs, ok := outputDocsArr[0].(map[string]interface{})
if ok {
for k := range outputDocs {
Expand Down Expand Up @@ -211,7 +217,7 @@ func (pc *ProtoClient) getDescriptors(data string) error {
cmdInfo := v.(map[string]interface{})
in, out, err := getOutputInputNames(cmdInfo)
if err != nil {
return err
return fmt.Errorf("failed to get output and input names for '%s' handler: %w", k, err)
}

var command Command
Expand Down Expand Up @@ -250,33 +256,33 @@ func (pc *ProtoClient) getDescriptors(data string) error {

encodedNames, err := proto.Marshal(protname)
if err != nil {
return err
return fmt.Errorf("failed to encode proto names: %w", err)
}
_, err = pc.SendRequest(pc.descriptorsRoute, encodedNames)
if err != nil {
return err
return fmt.Errorf("failed to send proto descriptors request: %w", err)
}

response := <-pc.Client.IncomingMsgChan
descriptors := &protos.ProtoDescriptors{}
if err := proto.Unmarshal(response.Data, descriptors); err != nil {
return err
return fmt.Errorf("failed to unmarshal proto descriptors response: %w", err)
}

// get all proto types
descriptorArray := make([]*protobuf.FileDescriptorProto, 0)
for i := range descriptors.Desc {
fileDescriptorProto, err := unpackDescriptor(descriptors.Desc[i])
if err != nil {
return err
return fmt.Errorf("failed to unpack descriptor: %w", err)
}

descriptorArray = append(descriptorArray, fileDescriptorProto)
pc.descriptorsNames[names[i]] = true
}

if err = pc.buildProtosFromDescriptor(descriptorArray); err != nil {
return err
return fmt.Errorf("failed to build proto from descriptor: %w", err)
}

return nil
Expand Down Expand Up @@ -341,11 +347,11 @@ func (pc *ProtoClient) LoadServerInfo(addr string) error {

docs := &protos.Doc{}
if err := proto.Unmarshal(response.Data, docs); err != nil {
return err
return fmt.Errorf("failed to unmarshal docs route response: %w", err)
}

if err := pc.getDescriptors(docs.Doc); err != nil {
return err
return fmt.Errorf("failed to read proto descriptors: %w", err)
}

pc.Disconnect()
Expand All @@ -369,7 +375,6 @@ func (pc *ProtoClient) waitForData() {
for {
select {
case response := <-pc.Client.IncomingMsgChan:

inputMsg := dynamic.NewMessage(pc.expectedInputDescriptor)

msg, ok := pc.info.Commands[response.Route]
Expand All @@ -388,27 +393,27 @@ func (pc *ProtoClient) waitForData() {
}
response.Data, err = json.Marshal(errMsg)
if err != nil {
logger.Log.Errorf("Erro encode error to json: %s", string(response.Data))
logger.Log.Errorf("error encode error to json: %s", string(response.Data))
continue
}
pc.IncomingMsgChan <- response
continue
}

if inputMsg == nil {
logger.Log.Errorf("Not expected data: %s", string(response.Data))
logger.Log.Errorf("not expected data: %s", string(response.Data))
continue
}

err := inputMsg.Unmarshal(response.Data)
if err != nil {
logger.Log.Errorf("Erro decode data: %s", string(response.Data))
logger.Log.Errorf("error decode data: %s", string(response.Data))
continue
}

data, err2 := inputMsg.MarshalJSON()
if err2 != nil {
logger.Log.Errorf("Erro encode data to json: %s", string(response.Data))
logger.Log.Errorf("error encode data to json: %s", string(response.Data))
continue
}

Expand Down
10 changes: 8 additions & 2 deletions cluster/etcd_service_discovery.go
Expand Up @@ -375,7 +375,10 @@ func (sd *etcdServiceDiscovery) Init() error {
var err error

if sd.cli == nil {
sd.InitETCDClient()
err = sd.InitETCDClient()
if err != nil {
return err
}
} else {
sd.cli.KV = namespace.NewKV(sd.cli.KV, sd.etcdPrefix)
sd.cli.Watcher = namespace.NewWatcher(sd.cli.Watcher, sd.etcdPrefix)
Expand Down Expand Up @@ -663,7 +666,10 @@ func (sd *etcdServiceDiscovery) watchEtcdChanges() {
failedWatchAttempts++
time.Sleep(1000 * time.Millisecond)
if failedWatchAttempts > 10 {
sd.InitETCDClient()
if err := sd.InitETCDClient(); err != nil {
failedWatchAttempts = 0
continue
}
chn = sd.cli.Watch(context.Background(), "servers/", clientv3.WithPrefix())
failedWatchAttempts = 0
}
Expand Down
20 changes: 11 additions & 9 deletions cluster/nats_rpc_server.go
Expand Up @@ -53,6 +53,8 @@ type NatsRPCServer struct {
subChan chan *nats.Msg // subChan is the channel used by the server to receive network messages addressed to itself
bindingsChan chan *nats.Msg // bindingsChan receives notify from other servers on every user bind to session
unhandledReqCh chan *protos.Request
responses []*protos.Response
requests []*protos.Request
userPushCh chan *protos.Push
userKickCh chan *protos.KickMsg
sub *nats.Subscription
Expand Down Expand Up @@ -254,23 +256,21 @@ func (ns *NatsRPCServer) marshalResponse(res *protos.Response) ([]byte, error) {
}

func (ns *NatsRPCServer) processMessages(threadID int) {
for req := range ns.GetUnhandledRequestsChannel() {
logger.Log.Debugf("(%d) processing message %v", threadID, req.GetMsg().GetId())
reply := req.GetMsg().GetReply()
var response *protos.Response
ctx, err := util.GetContextFromRequest(req, ns.server.ID)
for ns.requests[threadID] = range ns.GetUnhandledRequestsChannel() {
logger.Log.Debugf("(%d) processing message %v", threadID, ns.requests[threadID].GetMsg().GetId())
ctx, err := util.GetContextFromRequest(ns.requests[threadID], ns.server.ID)
if err != nil {
response = &protos.Response{
ns.responses[threadID] = &protos.Response{
Error: &protos.Error{
Code: e.ErrInternalCode,
Msg: err.Error(),
},
}
} else {
response, _ = ns.pitayaServer.Call(ctx, req)
ns.responses[threadID], _ = ns.pitayaServer.Call(ctx, ns.requests[threadID])
}
p, err := ns.marshalResponse(response)
err = ns.conn.Publish(reply, p)
p, err := ns.marshalResponse(ns.responses[threadID])
err = ns.conn.Publish(ns.requests[threadID].GetMsg().GetReply(), p)
if err != nil {
logger.Log.Error("error sending message response")
}
Expand Down Expand Up @@ -311,6 +311,8 @@ func (ns *NatsRPCServer) processKick() {

// Init inits nats rpc server
func (ns *NatsRPCServer) Init() error {
ns.responses = make([]*protos.Response, ns.config.GetInt("pitaya.concurrency.remote.service"))
ns.requests = make([]*protos.Request, ns.config.GetInt("pitaya.concurrency.remote.service"))
// TODO should we have concurrency here? it feels like we should
go ns.handleMessages()

Expand Down
16 changes: 13 additions & 3 deletions conn/message/message.go
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
)

// Type represents the type of message, which could be Request/Notify/Response/Push
Expand Down Expand Up @@ -54,8 +55,9 @@ var types = map[Type]string{
}

var (
routes = make(map[string]uint16) // route map to code
codes = make(map[uint16]string) // code map to route
routesCodesMutex = sync.RWMutex{}
routes = make(map[string]uint16) // route map to code
codes = make(map[uint16]string) // code map to route
)

// Errors that could be occurred in message codec
Expand Down Expand Up @@ -110,6 +112,8 @@ func SetDictionary(dict map[string]uint16) error {
if dict == nil {
return nil
}
routesCodesMutex.Lock()
defer routesCodesMutex.Unlock()

for route, code := range dict {
r := strings.TrimSpace(route)
Expand All @@ -133,7 +137,13 @@ func SetDictionary(dict map[string]uint16) error {

// GetDictionary gets the routes map which is used to compress route.
func GetDictionary() map[string]uint16 {
return routes
routesCodesMutex.RLock()
defer routesCodesMutex.RUnlock()
dict := make(map[string]uint16)
for k, v := range routes {
dict[k] = v
}
return dict
}

func (t *Type) String() string {
Expand Down
4 changes: 4 additions & 0 deletions conn/message/message_encoder.go
Expand Up @@ -69,7 +69,9 @@ func (me *MessagesEncoder) Encode(message *Message) ([]byte, error) {
buf := make([]byte, 0)
flag := byte(message.Type) << 1

routesCodesMutex.RLock()
code, compressed := routes[message.Route]
routesCodesMutex.RUnlock()
if compressed {
flag |= msgRouteCompressMask
}
Expand Down Expand Up @@ -163,7 +165,9 @@ func Decode(data []byte) (*Message, error) {
if flag&msgRouteCompressMask == 1 {
m.compressed = true
code := binary.BigEndian.Uint16(data[offset:(offset + 2)])
routesCodesMutex.RLock()
route, ok := codes[code]
routesCodesMutex.RUnlock()
if !ok {
return nil, ErrRouteInfoNotFound
}
Expand Down
41 changes: 40 additions & 1 deletion conn/message/message_test.go
Expand Up @@ -3,6 +3,7 @@ package message
import (
"errors"
"flag"
"fmt"
"path/filepath"
"testing"

Expand All @@ -14,6 +15,8 @@ var update = flag.Bool("update", false, "update .golden files")

func resetDicts(t *testing.T) {
t.Helper()
routesCodesMutex.Lock()
defer routesCodesMutex.Unlock()
routes = make(map[string]uint16)
codes = make(map[uint16]string)
}
Expand Down Expand Up @@ -156,7 +159,7 @@ var dictTables = map[string]struct {
map[uint16]string{1: "a"}, errors.New("duplicated route(route: b, code: 1)")},
}

func TestSetDictionaty(t *testing.T) {
func TestSetDictionary(t *testing.T) {
for name, table := range dictTables {
t.Run(name, func(t *testing.T) {
for _, dict := range table.dicts {
Expand All @@ -170,3 +173,39 @@ func TestSetDictionaty(t *testing.T) {
})
}
}

func TestSetDictionaryRace(t *testing.T) {
defer resetDicts(t)

done := make(chan bool, 2)

setDictRace := func(dict map[string]uint16) {
assert.Nil(t, SetDictionary(dict))
done <- true
}

go setDictRace(map[string]uint16{"a": 1})
go setDictRace(map[string]uint16{"b": 2})

// wait for both setDictRace to finish
<-done
<-done

expected_codes := map[uint16]string{1: "a", 2: "b"}
assert.EqualValues(t, expected_codes, codes)

expected_routes := map[string]uint16{"a": 1, "b": 2}
assert.EqualValues(t, expected_routes, routes)
}

func TestGetDictionary(t *testing.T) {
defer resetDicts(t)
expected := map[string]uint16{"a": 1, "b": 2}
assert.Nil(t, SetDictionary(expected))

dict := GetDictionary()
assert.Equal(t, expected, dict)

// make sure we're copying the routes maps
assert.NotEqual(t, fmt.Sprintf("%p", routes), fmt.Sprintf("%p", dict))
}
5 changes: 3 additions & 2 deletions examples/demo/cluster/main.go
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/topfreegames/pitaya/component"
"github.com/topfreegames/pitaya/examples/demo/cluster/services"
"github.com/topfreegames/pitaya/route"
"github.com/topfreegames/pitaya/serialize/json"
"github.com/topfreegames/pitaya/serialize/protobuf"
)

func configureBackend() {
Expand All @@ -36,6 +36,7 @@ func configureFrontend(port int) {
component.WithName("connector"),
component.WithNameFunc(strings.ToLower),
)

pitaya.RegisterRemote(&services.ConnectorRemote{},
component.WithName("connectorremote"),
component.WithNameFunc(strings.ToLower),
Expand Down Expand Up @@ -82,7 +83,7 @@ func main() {

defer pitaya.Shutdown()

pitaya.SetSerializer(json.NewSerializer())
pitaya.SetSerializer(protobuf.NewSerializer())

if !*isFrontend {
configureBackend()
Expand Down

0 comments on commit 6fb530e

Please sign in to comment.