Skip to content

Commit

Permalink
example of rate limiting wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed May 24, 2019
1 parent ae70329 commit b6d10c3
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 0 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ run-cluster-worker-example-worker:
run-custom-metrics-example:
@cd examples/demo/custom_metrics && go run main.go --port 3250

run-rate-limiting-example-frontend:
@go run examples/demo/rate_limiting/main.go

run-rate-limiting-example-backend:
@go run examples/demo/rate_limiting/main.go --port 3251 --type room --frontend=false

protos-compile:
@cd benchmark/testdata && ./gen_proto.sh
@protoc -I pitaya-protos/ pitaya-protos/*.proto --go_out=plugins=grpc:protos
Expand Down
109 changes: 109 additions & 0 deletions examples/demo/rate_limiting/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package main

import (
"context"
"flag"
"fmt"
"time"

"strings"

"github.com/spf13/viper"
"github.com/topfreegames/pitaya"
"github.com/topfreegames/pitaya/acceptor"
"github.com/topfreegames/pitaya/acceptorwrapper"
"github.com/topfreegames/pitaya/cluster"
"github.com/topfreegames/pitaya/component"
"github.com/topfreegames/pitaya/config"
"github.com/topfreegames/pitaya/examples/demo/rate_limiting/services"
"github.com/topfreegames/pitaya/route"
"github.com/topfreegames/pitaya/serialize/json"
)

func configureBackend() {
room := services.NewRoom()
pitaya.Register(room,
component.WithName("room"),
component.WithNameFunc(strings.ToLower),
)

pitaya.RegisterRemote(room,
component.WithName("room"),
component.WithNameFunc(strings.ToLower),
)
}

func configureFrontend(port int) {
tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", port))

pitaya.Register(&services.Connector{},
component.WithName("connector"),
component.WithNameFunc(strings.ToLower),
)
pitaya.RegisterRemote(&services.ConnectorRemote{},
component.WithName("connectorremote"),
component.WithNameFunc(strings.ToLower),
)

err := pitaya.AddRoute("room", func(
ctx context.Context,
route *route.Route,
payload []byte,
servers map[string]*cluster.Server,
) (*cluster.Server, error) {
// will return the first server
for k := range servers {
return servers[k], nil
}
return nil, nil
})

if err != nil {
fmt.Printf("error adding route %s\n", err.Error())
}

err = pitaya.SetDictionary(map[string]uint16{
"connector.getsessiondata": 1,
"connector.setsessiondata": 2,
"room.room.getsessiondata": 3,
"onMessage": 4,
"onMembers": 5,
})

if err != nil {
fmt.Printf("error setting route dictionary %s\n", err.Error())
}

// 5 requests in 1 minute. Doesn't make sense, just to test
// rate limiting
vConfig := viper.New()
vConfig.Set("pitaya.conn.ratelimiting.limit", 5)
vConfig.Set("pitaya.conn.ratelimiting.interval", time.Minute)
pConfig := config.NewConfig(vConfig)

wrapped := acceptorwrapper.WithWrappers(
tcp,
acceptorwrapper.NewRateLimitingWrapper(pConfig))
pitaya.AddAcceptor(wrapped)
}

func main() {
port := flag.Int("port", 3250, "the port to listen")
svType := flag.String("type", "connector", "the server type")
isFrontend := flag.Bool("frontend", true, "if server is frontend")

flag.Parse()

defer pitaya.Shutdown()

pitaya.SetSerializer(json.NewSerializer())

if !*isFrontend {
configureBackend()
} else {
configureFrontend(*port)
}

pitaya.Configure(*isFrontend, *svType, pitaya.Cluster, map[string]string{})
pitaya.Start()
}
91 changes: 91 additions & 0 deletions examples/demo/rate_limiting/services/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package services

import (
"context"
"encoding/json"
"fmt"

"github.com/topfreegames/pitaya"
"github.com/topfreegames/pitaya/component"
"github.com/topfreegames/pitaya/examples/demo/protos"
)

// ConnectorRemote is a remote that will receive rpc's
type ConnectorRemote struct {
component.Base
}

// Connector struct
type Connector struct {
component.Base
}

// SessionData struct
type SessionData struct {
Data map[string]interface{}
}

// Response struct
type Response struct {
Code int32
Msg string
}

func reply(code int32, msg string) (*Response, error) {
res := &Response{
Code: code,
Msg: msg,
}
return res, nil
}

// GetSessionData gets the session data
func (c *Connector) GetSessionData(ctx context.Context) (*SessionData, error) {
s := pitaya.GetSessionFromCtx(ctx)
res := &SessionData{
Data: s.GetData(),
}
return res, nil
}

// SetSessionData sets the session data
func (c *Connector) SetSessionData(ctx context.Context, data *SessionData) (*Response, error) {
s := pitaya.GetSessionFromCtx(ctx)
err := s.SetData(data.Data)
if err != nil {
return nil, pitaya.Error(err, "CN-000", map[string]string{"failed": "set data"})
}
return reply(200, "success")
}

// NotifySessionData sets the session data
func (c *Connector) NotifySessionData(ctx context.Context, data *SessionData) {
s := pitaya.GetSessionFromCtx(ctx)
err := s.SetData(data.Data)
if err != nil {
fmt.Println("got error on notify", err)
}
}

// RemoteFunc is a function that will be called remotely
func (c *ConnectorRemote) RemoteFunc(ctx context.Context, msg *protos.RPCMsg) (*protos.RPCRes, error) {
fmt.Printf("received a remote call with this message: %s\n", msg.GetMsg())
return &protos.RPCRes{
Msg: msg.GetMsg(),
}, nil
}

// Docs returns documentation
func (c *ConnectorRemote) Docs(ctx context.Context, ddd *protos.Doc) (*protos.Doc, error) {
d, err := pitaya.Documentation(true)
if err != nil {
return nil, err
}
doc, err := json.Marshal(d)

if err != nil {
return nil, err
}

return &protos.Doc{Doc: string(doc)}, nil
}
181 changes: 181 additions & 0 deletions examples/demo/rate_limiting/services/room.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package services

import (
"context"
"fmt"
"time"

"github.com/topfreegames/pitaya"
"github.com/topfreegames/pitaya/component"
"github.com/topfreegames/pitaya/config"
"github.com/topfreegames/pitaya/examples/demo/protos"
"github.com/topfreegames/pitaya/groups"
"github.com/topfreegames/pitaya/timer"
)

type (
// Room represents a component that contains a bundle of room related handler
// like Join/Message
Room struct {
component.Base
timer *timer.Timer
Stats *Stats
}

// UserMessage represents a message that user sent
UserMessage struct {
Name string `json:"name"`
Content string `json:"content"`
}

// Stats exports the room status
Stats struct {
outboundBytes int
inboundBytes int
}

// RPCResponse represents a rpc message
RPCResponse struct {
Msg string `json:"msg"`
}

// SendRPCMsg represents a rpc message
SendRPCMsg struct {
ServerID string `json:"serverId"`
Route string `json:"route"`
Msg string `json:"msg"`
}

// NewUser message will be received when new user join room
NewUser struct {
Content string `json:"content"`
}

// AllMembers contains all members uid
AllMembers struct {
Members []string `json:"members"`
}

// JoinResponse represents the result of joining room
JoinResponse struct {
Code int `json:"code"`
Result string `json:"result"`
}
)

// NewRoom returns a new room
func NewRoom() *Room {
return &Room{
Stats: &Stats{},
}
}

// Init runs on service initialization
func (r *Room) Init() {
gsi := groups.NewMemoryGroupService(config.NewConfig())
pitaya.InitGroups(gsi)
pitaya.GroupCreate(context.Background(), "room")
}

// AfterInit component lifetime callback
func (r *Room) AfterInit() {
r.timer = pitaya.NewTimer(time.Minute, func() {
count, err := pitaya.GroupCountMembers(context.Background(), "room")
println("UserCount: Time=>", time.Now().String(), "Count=>", count, "Error=>", err)
println("OutboundBytes", r.Stats.outboundBytes)
println("InboundBytes", r.Stats.outboundBytes)
})
}

// Entry is the entrypoint
func (r *Room) Entry(ctx context.Context, msg []byte) (*JoinResponse, error) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx) // The default logger contains a requestId, the route being executed and the sessionId
s := pitaya.GetSessionFromCtx(ctx)

err := s.Bind(ctx, "helroow")
if err != nil {
logger.Error("Failed to bind session")
logger.Error(err)
return nil, pitaya.Error(err, "RH-000", map[string]string{"failed": "bind"})
}
return &JoinResponse{Result: "ok"}, nil
}

// GetSessionData gets the session data
func (r *Room) GetSessionData(ctx context.Context) (*SessionData, error) {
s := pitaya.GetSessionFromCtx(ctx)
return &SessionData{
Data: s.GetData(),
}, nil
}

// SetSessionData sets the session data
func (r *Room) SetSessionData(ctx context.Context, data *SessionData) ([]byte, error) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx)
s := pitaya.GetSessionFromCtx(ctx)
err := s.SetData(data.Data)
if err != nil {
logger.Error("Failed to set session data")
logger.Error(err)
return nil, err
}
err = s.PushToFront(ctx)
if err != nil {
return nil, err
}
return []byte("success"), nil
}

// Join room
func (r *Room) Join(ctx context.Context) (*JoinResponse, error) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx)
s := pitaya.GetSessionFromCtx(ctx)
err := pitaya.GroupAddMember(ctx, "room", s.UID())
if err != nil {
logger.Error("Failed to join room")
logger.Error(err)
return nil, err
}
members, err := pitaya.GroupMembers(ctx, "room")
if err != nil {
logger.Error("Failed to get members")
logger.Error(err)
return nil, err
}
s.Push("onMembers", &AllMembers{Members: members})
err = pitaya.GroupBroadcast(ctx, "connector", "room", "onNewUser", &NewUser{Content: fmt.Sprintf("New user: %d", s.ID())})
if err != nil {
logger.Error("Failed to broadcast onNewUser")
logger.Error(err)
return nil, err
}
return &JoinResponse{Result: "success"}, nil
}

// Message sync last message to all members
func (r *Room) Message(ctx context.Context, msg *UserMessage) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx)
err := pitaya.GroupBroadcast(ctx, "connector", "room", "onMessage", msg)
if err != nil {
logger.Error("Error broadcasting message")
logger.Error(err)
}
}

// SendRPC sends rpc
func (r *Room) SendRPC(ctx context.Context, msg *SendRPCMsg) (*protos.RPCRes, error) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx)
ret := &protos.RPCRes{}
err := pitaya.RPCTo(ctx, msg.ServerID, msg.Route, ret, &protos.RPCMsg{Msg: msg.Msg})
if err != nil {
logger.Errorf("Failed to execute RPCTo %s - %s", msg.ServerID, msg.Route)
logger.Error(err)
return nil, pitaya.Error(err, "RPC-000")
}
return ret, nil
}

// MessageRemote just echoes the given message
func (r *Room) MessageRemote(ctx context.Context, msg *UserMessage, b bool, s string) (*UserMessage, error) {
return msg, nil
}

0 comments on commit b6d10c3

Please sign in to comment.