Skip to content

Commit

Permalink
Merge 9b7356e into 08d1d52
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Oct 4, 2018
2 parents 08d1d52 + 9b7356e commit ef18d1c
Show file tree
Hide file tree
Showing 31 changed files with 1,888 additions and 174 deletions.
214 changes: 42 additions & 172 deletions Gopkg.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,7 @@
[[constraint]]
name = "github.com/prometheus/client_golang"
version = "0.8.0"

[[override]]
name = "github.com/topfreegames/go-workers"
version = "v0.0.1"
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ run-cluster-grpc-example-connector:
run-cluster-grpc-example-room:
@cd examples/demo/cluster_grpc && go run main.go --port 3251 --rpcsvport 3435 --type room --frontend=false

run-cluster-worker-example-room:
@cd examples/demo/worker && go run main.go --type room --frontend=true

run-cluster-worker-example-metagame:
@cd examples/demo/worker && go run main.go --type metagame --frontend=false

run-cluster-worker-example-worker:
@cd examples/demo/worker && go run main.go --type worker --frontend=false

protos-compile:
@cd benchmark/testdata && ./gen_proto.sh
@protoc -I pitaya-protos/ pitaya-protos/*.proto --gogofaster_out=plugins=grpc:protos
Expand Down
27 changes: 25 additions & 2 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/topfreegames/pitaya/session"
"github.com/topfreegames/pitaya/timer"
"github.com/topfreegames/pitaya/tracing"
"github.com/topfreegames/pitaya/worker"
)

// ServerMode represents a server mode
Expand Down Expand Up @@ -90,6 +91,7 @@ type App struct {
serverMode ServerMode
serviceDiscovery cluster.ServiceDiscovery
startAt time.Time
worker *worker.Worker
}

var (
Expand Down Expand Up @@ -329,6 +331,10 @@ func initSysRemotes() {
func periodicMetrics() {
period := app.config.GetDuration("pitaya.metrics.periodicMetrics.period")
go metrics.ReportSysMetrics(app.metricsReporters, period)

if app.worker.Started() {
go worker.Report(app.metricsReporters, period)
}
}

// Start starts the app
Expand Down Expand Up @@ -573,8 +579,6 @@ func AddGRPCInfoToMetadata(
metadata map[string]string,
region, host, externalHost, port string,
) map[string]string {
// TODO: should I get all this information here? Or just
// receive them as argument
metadata[constants.GRPCHostKey] = host
metadata[constants.GRPCExternalHostKey] = externalHost
metadata[constants.GRPCPortKey] = port
Expand All @@ -586,3 +590,22 @@ func AddGRPCInfoToMetadata(
func Descriptor(protoName string) ([]byte, error) {
return docgenerator.ProtoDescriptors(protoName)
}

// StartWorker configures, starts and returns pitaya worker
func StartWorker(config *config.Config) error {
var err error
app.worker, err = worker.NewWorker(config)
if err != nil {
return err
}

app.worker.Start()

return nil
}

// RegisterRPCJob registers rpc job to execute jobs with retries
func RegisterRPCJob(rpcJob worker.RPCJob) error {
err := app.worker.RegisterRPCJob(rpcJob)
return err
}
37 changes: 37 additions & 0 deletions app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,3 +652,40 @@ func TestAddGRPCInfoToMetadata(t *testing.T) {
constants.RegionKey: "region",
}, metadata)
}

func TestStartWorker(t *testing.T) {
cfg := viper.New()
initApp()
Configure(true, "testtype", Cluster, map[string]string{}, cfg)

err := StartWorker(GetConfig())
assert.NoError(t, err)
assert.True(t, app.worker.Started())
}

func TestRegisterRPCJob(t *testing.T) {
t.Run("register_once", func(t *testing.T) {
cfg := viper.New()
initApp()
Configure(true, "testtype", Cluster, map[string]string{}, cfg)
err := StartWorker(GetConfig())
assert.NoError(t, err)

err = RegisterRPCJob(nil)
assert.NoError(t, err)
})

t.Run("register_twice", func(t *testing.T) {
cfg := viper.New()
initApp()
Configure(true, "testtype", Cluster, map[string]string{}, cfg)
err := StartWorker(GetConfig())
assert.NoError(t, err)

err = RegisterRPCJob(nil)
assert.NoError(t, err)

err = RegisterRPCJob(nil)
assert.Equal(t, constants.ErrRPCJobAlreadyRegistered, err)
})
}
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ func (c *Config) fillDefaultValues() {
"pitaya.metrics.additionalTags": map[string]string{},
"pitaya.metrics.periodicMetrics.period": "15s",
"pitaya.defaultpipelines.structvalidation.enabled": false,
"pitaya.worker.redis.url": "localhost:6379",
"pitaya.worker.redis.pool": "10",
"pitaya.worker.concurrency": 1,
"pitaya.worker.retry.enabled": true,
"pitaya.worker.retry.max": 5,
"pitaya.worker.retry.exponential": 2,
"pitaya.worker.retry.minDelay": 0,
"pitaya.worker.retry.maxDelay": 10,
"pitaya.worker.retry.maxRandom": 10,
}

for param := range defaultsMap {
Expand Down
1 change: 1 addition & 0 deletions constants/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,5 @@ var (
ErrTimeoutTerminatingBinaryModule = errors.New("timeout waiting to binary module to die")
ErrFrontendTypeNotSpecified = errors.New("for using SendPushToUsers from a backend server you have to specify a valid frontendType")
ErrMetricNotKnown = errors.New("the provided metric does not exist")
ErrRPCJobAlreadyRegistered = errors.New("rpc job was already registered")
)
36 changes: 36 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,42 @@ The configurations only need to be set if the RPC Service is enabled with the gi
- 30
- int
- Number of goroutines processing messages at the remote service for the nats RPC service
* - pitaya.worker.redis.url
- localhost:6379
- string
- Redis url pitaya workers use to register jobs
* - pitaya.worker.redis.pool
- 10
- string
- Number of connections to keep with Redis
* - pitaya.worker.concurrency
- 1
- int
- Number of workers to execute job
* - pitaya.worker.retry.enabled
- true
- bool
- If true, retry job if errored for max times
* - pitaya.worker.retry.max
- 5
- int
- Max number of job retries
* - pitaya.worker.retry.exponential
- 2
- int
- Retry job after backoff of nRetry**2
* - pitaya.worker.retry.minDelay
- 0
- int
- Min time to wait on backoff to retry job
* - pitaya.worker.retry.maxDelay
- 10
- int
- Max time to wait on backoff to retry job
* - pitaya.worker.retry.maxRandom
- 10
- int
- Random time to wait during backoff

Connection
==========
Expand Down
6 changes: 6 additions & 0 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ These are the RPCs done by the servers when forwarding handler messages to the a

User RPCs are done when the application actively calls a remote method in another server. The call can specify the ID of the target server or let Pitaya choose one according to the routing logic.

### User Reliable RPCs

These are done when the application calls a remote using workers, that is, Pitaya retries the RPC if any error occurrs.

**Important**: the remote that is being called must be idempotent; also the ReliableRPC will not return the remote's reply since it is asynchronous, it only returns the job id (jid) if success.

## Server operation mode

Pitaya has two types of operation: standalone and cluster mode.
Expand Down
75 changes: 75 additions & 0 deletions examples/demo/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"flag"
"fmt"

"strings"

"github.com/spf13/viper"
"github.com/topfreegames/pitaya"
"github.com/topfreegames/pitaya/acceptor"
"github.com/topfreegames/pitaya/component"
"github.com/topfreegames/pitaya/examples/demo/worker/services"
"github.com/topfreegames/pitaya/serialize/json"
)

func configureMetagame() {
pitaya.RegisterRemote(&services.Metagame{},
component.WithName("metagame"),
component.WithNameFunc(strings.ToLower),
)
}

func configureRoom(port int) error {
tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", port))
pitaya.AddAcceptor(tcp)

pitaya.Register(&services.Room{},
component.WithName("room"),
component.WithNameFunc(strings.ToLower),
)

err := pitaya.StartWorker(pitaya.GetConfig())
return err
}

func configureWorker() error {
worker := services.Worker{}
err := worker.Configure()
return err
}

func main() {
port := flag.Int("port", 3250, "the port to listen")
svType := flag.String("type", "metagame", "the server type")
isFrontend := flag.Bool("frontend", true, "if server is frontend")

flag.Parse()

defer pitaya.Shutdown()

pitaya.SetSerializer(json.NewSerializer())

config := viper.New()
config.SetDefault("pitaya.worker.redis.url", "localhost:6379")
config.SetDefault("pitaya.worker.redis.pool", "3")

pitaya.Configure(*isFrontend, *svType, pitaya.Cluster, map[string]string{})

var err error
switch *svType {
case "metagame":
configureMetagame()
case "room":
err = configureRoom(*port)
case "worker":
err = configureWorker()
}

if err != nil {
panic(err)
}

pitaya.Start()
}
Loading

0 comments on commit ef18d1c

Please sign in to comment.