Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrating pr 225 #232

Merged
merged 3 commits into from
Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ run-custom-metrics-example:
run-rate-limiting-example:
@go run examples/demo/rate_limiting/main.go

protos-compile-demo:
@protoc -I examples/demo/protos examples/demo/protos/*.proto --go_out=.

protos-compile:
@cd benchmark/testdata && ./gen_proto.sh
@protoc -I pitaya-protos/ pitaya-protos/*.proto --go_out=plugins=grpc:protos
Expand Down
31 changes: 18 additions & 13 deletions client/protoclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"strings"
"time"
Expand Down Expand Up @@ -144,6 +145,11 @@ func getOutputInputNames(command map[string]interface{}) (string, string, error)

out := command["output"]
outputDocsArr := out.([]interface{})
// we can have handlers that have no return specified.
if len(outputDocsArr) == 0 {
return inputName, "", nil
}

outputDocs, ok := outputDocsArr[0].(map[string]interface{})
if ok {
for k := range outputDocs {
Expand Down Expand Up @@ -211,7 +217,7 @@ func (pc *ProtoClient) getDescriptors(data string) error {
cmdInfo := v.(map[string]interface{})
in, out, err := getOutputInputNames(cmdInfo)
if err != nil {
return err
return fmt.Errorf("failed to get output and input names for '%s' handler: %w", k, err)
}

var command Command
Expand Down Expand Up @@ -250,33 +256,33 @@ func (pc *ProtoClient) getDescriptors(data string) error {

encodedNames, err := proto.Marshal(protname)
if err != nil {
return err
return fmt.Errorf("failed to encode proto names: %w", err)
}
_, err = pc.SendRequest(pc.descriptorsRoute, encodedNames)
if err != nil {
return err
return fmt.Errorf("failed to send proto descriptors request: %w", err)
}

response := <-pc.Client.IncomingMsgChan
descriptors := &protos.ProtoDescriptors{}
if err := proto.Unmarshal(response.Data, descriptors); err != nil {
return err
return fmt.Errorf("failed to unmarshal proto descriptors response: %w", err)
}

// get all proto types
descriptorArray := make([]*protobuf.FileDescriptorProto, 0)
for i := range descriptors.Desc {
fileDescriptorProto, err := unpackDescriptor(descriptors.Desc[i])
if err != nil {
return err
return fmt.Errorf("failed to unpack descriptor: %w", err)
}

descriptorArray = append(descriptorArray, fileDescriptorProto)
pc.descriptorsNames[names[i]] = true
}

if err = pc.buildProtosFromDescriptor(descriptorArray); err != nil {
return err
return fmt.Errorf("failed to build proto from descriptor: %w", err)
}

return nil
Expand Down Expand Up @@ -341,11 +347,11 @@ func (pc *ProtoClient) LoadServerInfo(addr string) error {

docs := &protos.Doc{}
if err := proto.Unmarshal(response.Data, docs); err != nil {
return err
return fmt.Errorf("failed to unmarshal docs route response: %w", err)
}

if err := pc.getDescriptors(docs.Doc); err != nil {
return err
return fmt.Errorf("failed to read proto descriptors: %w", err)
}

pc.Disconnect()
Expand All @@ -369,7 +375,6 @@ func (pc *ProtoClient) waitForData() {
for {
select {
case response := <-pc.Client.IncomingMsgChan:

inputMsg := dynamic.NewMessage(pc.expectedInputDescriptor)

msg, ok := pc.info.Commands[response.Route]
Expand All @@ -388,27 +393,27 @@ func (pc *ProtoClient) waitForData() {
}
response.Data, err = json.Marshal(errMsg)
if err != nil {
logger.Log.Errorf("Erro encode error to json: %s", string(response.Data))
logger.Log.Errorf("error encode error to json: %s", string(response.Data))
continue
}
pc.IncomingMsgChan <- response
continue
}

if inputMsg == nil {
logger.Log.Errorf("Not expected data: %s", string(response.Data))
logger.Log.Errorf("not expected data: %s", string(response.Data))
continue
}

err := inputMsg.Unmarshal(response.Data)
if err != nil {
logger.Log.Errorf("Erro decode data: %s", string(response.Data))
logger.Log.Errorf("error decode data: %s", string(response.Data))
continue
}

data, err2 := inputMsg.MarshalJSON()
if err2 != nil {
logger.Log.Errorf("Erro encode data to json: %s", string(response.Data))
logger.Log.Errorf("error encode data to json: %s", string(response.Data))
continue
}

Expand Down
3 changes: 3 additions & 0 deletions examples/demo/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func configureFrontend(port int) {
component.WithName("connector"),
component.WithNameFunc(strings.ToLower),
)

app.RegisterRemote(services.NewConnectorRemote(app),
component.WithName("connectorremote"),
component.WithNameFunc(strings.ToLower),
Expand Down Expand Up @@ -87,6 +88,8 @@ func main() {
builder.Groups = groups.NewMemoryGroupService(*config.NewDefaultMemoryGroupConfig())
app = builder.Build()

//TODO: Oelze pitaya.SetSerializer(protobuf.NewSerializer())

defer app.Shutdown()

if !*isFrontend {
Expand Down
24 changes: 20 additions & 4 deletions examples/demo/cluster/services/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/topfreegames/pitaya/v2"
"github.com/topfreegames/pitaya/v2/component"
"github.com/topfreegames/pitaya/v2/examples/demo/protos"
pitayaprotos "github.com/topfreegames/pitaya/v2/protos"
)

// ConnectorRemote is a remote that will receive rpc's
Expand Down Expand Up @@ -43,8 +44,8 @@ func NewConnectorRemote(app pitaya.Pitaya) *ConnectorRemote {
return &ConnectorRemote{app: app}
}

func reply(code int32, msg string) (*Response, error) {
res := &Response{
func reply(code int32, msg string) (*protos.Response, error) {
res := &protos.Response{
Code: code,
Msg: msg,
}
Expand Down Expand Up @@ -88,7 +89,7 @@ func (c *ConnectorRemote) RemoteFunc(ctx context.Context, msg *protos.RPCMsg) (*
}

// Docs returns documentation
func (c *ConnectorRemote) Docs(ctx context.Context, ddd *protos.Doc) (*protos.Doc, error) {
func (c *ConnectorRemote) Docs(ctx context.Context, ddd *pitayaprotos.Doc) (*protos.Doc, error) {
d, err := c.app.Documentation(true)
if err != nil {
return nil, err
Expand All @@ -99,5 +100,20 @@ func (c *ConnectorRemote) Docs(ctx context.Context, ddd *protos.Doc) (*protos.Do
return nil, err
}

return &protos.Doc{Doc: string(doc)}, nil
return &pitayaprotos.Doc{Doc: string(doc)}, nil
}

func (c *ConnectorRemote) Descriptor(ctx context.Context, names *pitayaprotos.ProtoNames) (*pitayaprotos.ProtoDescriptors, error) {
descriptors := make([][]byte, len(names.Name))

for i, protoName := range names.Name {
desc, err := pitaya.Descriptor(protoName)
if err != nil {
return nil, fmt.Errorf("failed to get descriptor for '%s': %w", protoName, err)
}

descriptors[i] = desc
}

return &pitayaprotos.ProtoDescriptors{Desc: descriptors}, nil
}
32 changes: 16 additions & 16 deletions examples/demo/cluster/services/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type (
component.Base
timer *timer.Timer
app pitaya.Pitaya
Stats *Stats
Stats *protos.Stats
}

// UserMessage represents a message that user sent
Expand Down Expand Up @@ -66,7 +66,7 @@ type (
func NewRoom(app pitaya.Pitaya) *Room {
return &Room{
app: app,
Stats: &Stats{},
Stats: &protos.Stats{},
}
}

Expand All @@ -80,23 +80,23 @@ func (r *Room) AfterInit() {
r.timer = pitaya.NewTimer(time.Minute, func() {
count, err := r.app.GroupCountMembers(context.Background(), "room")
println("UserCount: Time=>", time.Now().String(), "Count=>", count, "Error=>", err)
println("OutboundBytes", r.Stats.outboundBytes)
println("InboundBytes", r.Stats.outboundBytes)
println("OutboundBytes", r.Stats.OutboundBytes)
println("InboundBytes", r.Stats.OutboundBytes)
})
}

// Entry is the entrypoint
func (r *Room) Entry(ctx context.Context, msg []byte) (*JoinResponse, error) {
func (r *Room) Entry(ctx context.Context, msg []byte) (*protos.JoinResponse, error) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx) // The default logger contains a requestId, the route being executed and the sessionId
s := r.app.GetSessionFromCtx(ctx)

err := s.Bind(ctx, "helroow")
err := s.Bind(ctx, "banana")
if err != nil {
logger.Error("Failed to bind session")
logger.Error(err)
return nil, pitaya.Error(err, "RH-000", map[string]string{"failed": "bind"})
}
return &JoinResponse{Result: "ok"}, nil
return &protos.JoinResponse{Result: "ok"}, nil
}

// GetSessionData gets the session data
Expand Down Expand Up @@ -125,7 +125,7 @@ func (r *Room) SetSessionData(ctx context.Context, data *SessionData) ([]byte, e
}

// Join room
func (r *Room) Join(ctx context.Context) (*JoinResponse, error) {
func (r *Room) Join(ctx context.Context) (*protos.JoinResponse, error) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx)
s := r.app.GetSessionFromCtx(ctx)
err := r.app.GroupAddMember(ctx, "room", s.UID())
Expand All @@ -140,18 +140,18 @@ func (r *Room) Join(ctx context.Context) (*JoinResponse, error) {
logger.Error(err)
return nil, err
}
s.Push("onMembers", &AllMembers{Members: members})
err = r.app.GroupBroadcast(ctx, "connector", "room", "onNewUser", &NewUser{Content: fmt.Sprintf("New user: %d", s.ID())})
s.Push("onMembers", &protos.AllMembers{Members: members})
err = r.app.GroupBroadcast(ctx, "connector", "room", "onNewUser", &protos.NewUser{Content: fmt.Sprintf("New user: %d", s.ID())})
if err != nil {
logger.Error("Failed to broadcast onNewUser")
logger.Error(err)
return nil, err
}
return &JoinResponse{Result: "success"}, nil
return &protos.JoinResponse{Result: "success"}, nil
}

// Message sync last message to all members
func (r *Room) Message(ctx context.Context, msg *UserMessage) {
func (r *Room) Message(ctx context.Context, msg *protos.UserMessage) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx)
err := r.app.GroupBroadcast(ctx, "connector", "room", "onMessage", msg)
if err != nil {
Expand All @@ -161,19 +161,19 @@ func (r *Room) Message(ctx context.Context, msg *UserMessage) {
}

// SendRPC sends rpc
func (r *Room) SendRPC(ctx context.Context, msg *SendRPCMsg) (*protos.RPCRes, error) {
func (r *Room) SendRPC(ctx context.Context, msg *protos.SendRPCMsg) (*protos.RPCRes, error) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx)
ret := &protos.RPCRes{}
err := r.app.RPCTo(ctx, msg.ServerID, msg.Route, ret, &protos.RPCMsg{Msg: msg.Msg})
err := r.app.RPCTo(ctx, msg.ServerId, msg.Route, ret, &protos.RPCMsg{Msg: msg.Msg})
if err != nil {
logger.Errorf("Failed to execute RPCTo %s - %s", msg.ServerID, msg.Route)
logger.Errorf("Failed to execute RPCTo %s - %s", msg.ServerId, msg.Route)
logger.Error(err)
return nil, pitaya.Error(err, "RPC-000")
}
return ret, nil
}

// MessageRemote just echoes the given message
func (r *Room) MessageRemote(ctx context.Context, msg *UserMessage, b bool, s string) (*UserMessage, error) {
func (r *Room) MessageRemote(ctx context.Context, msg *protos.UserMessage, b bool, s string) (*protos.UserMessage, error) {
return msg, nil
}
Loading