diff --git a/acceptorwrapper/rate_limiting_wrapper_test.go b/acceptorwrapper/rate_limiting_wrapper_test.go index 610edc64..e3a3d021 100644 --- a/acceptorwrapper/rate_limiting_wrapper_test.go +++ b/acceptorwrapper/rate_limiting_wrapper_test.go @@ -34,7 +34,7 @@ func TestNewRateLimitingWrapper(t *testing.T) { reporters := []metrics.Reporter{} - rateLimitingWrapper := NewRateLimitingWrapper(reporters, *config.NewDefaultRateLimitingConfig()) + rateLimitingWrapper := NewRateLimitingWrapper(reporters, config.NewDefaultPitayaConfig().Conn.RateLimiting) expected := NewRateLimiter(reporters, nil, 20, time.Second, false) assert.Equal(t, expected, rateLimitingWrapper.wrapConn(nil)) } diff --git a/app_test.go b/app_test.go index 801b35ad..54fd96c9 100644 --- a/app_test.go +++ b/app_test.go @@ -71,7 +71,7 @@ func TestMain(m *testing.M) { func TestNewApp(t *testing.T) { for _, table := range tables { t.Run(table.serverType, func(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(table.isFrontend, table.serverType, table.serverMode, table.serverMetadata, *builderConfig).(*App) assert.Equal(t, table.isFrontend, app.server.Frontend) assert.Equal(t, table.serverType, app.server.Type) @@ -85,7 +85,7 @@ func TestAddAcceptor(t *testing.T) { acc := acceptor.NewTCPAcceptor("0.0.0.0:0") for _, table := range tables { t.Run(table.serverType, func(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() builder := NewDefaultBuilder(table.isFrontend, table.serverType, table.serverMode, table.serverMetadata, *builderConfig) builder.AddAcceptor(acc) app := builder.Build().(*App) @@ -99,7 +99,7 @@ func TestAddAcceptor(t *testing.T) { } func TestSetDebug(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) app.SetDebug(true) assert.Equal(t, true, app.debug) @@ -114,24 +114,24 @@ func TestSetLogger(t *testing.T) { } func TestGetDieChan(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) assert.Equal(t, app.dieChan, app.GetDieChan()) } func TestGetSever(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) assert.Equal(t, app.server, app.GetServer()) } func TestGetMetricsReporters(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) assert.Equal(t, app.metricsReporters, app.GetMetricsReporters()) } func TestGetServerByID(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig) s, err := app.GetServerByID("id") assert.Nil(t, s) @@ -139,7 +139,7 @@ func TestGetServerByID(t *testing.T) { } func TestGetServersByType(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig) s, err := app.GetServersByType("id") assert.Nil(t, s) @@ -148,21 +148,21 @@ func TestGetServersByType(t *testing.T) { func TestSetHeartbeatInterval(t *testing.T) { inter := 35 * time.Millisecond - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) app.SetHeartbeatTime(inter) assert.Equal(t, inter, app.heartbeat) } func TestInitSysRemotes(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) app.initSysRemotes() assert.NotNil(t, app.remoteComp[0]) } func TestSetDictionary(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) dict := map[string]uint16{"someroute": 12} @@ -176,7 +176,7 @@ func TestSetDictionary(t *testing.T) { } func TestAddRoute(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) app.router = nil err := app.AddRoute("somesv", func(ctx context.Context, route *route.Route, payload []byte, servers map[string]*cluster.Server) (*cluster.Server, error) { @@ -198,7 +198,7 @@ func TestAddRoute(t *testing.T) { } func TestShutdown(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) go func() { app.Shutdown() @@ -216,7 +216,7 @@ func TestConfigureDefaultMetricsReporter(t *testing.T) { for _, table := range tables { t.Run(fmt.Sprintf("%t", table.enabled), func(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() builderConfig.Metrics.Prometheus.Enabled = table.enabled builderConfig.Metrics.Statsd.Enabled = table.enabled app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) @@ -227,11 +227,11 @@ func TestConfigureDefaultMetricsReporter(t *testing.T) { } func TestDefaultSD(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) assert.NotNil(t, app.serviceDiscovery) - etcdSD, err := cluster.NewEtcdServiceDiscovery(*config.NewDefaultEtcdServiceDiscoveryConfig(), app.server, app.dieChan) + etcdSD, err := cluster.NewEtcdServiceDiscovery(config.NewDefaultPitayaConfig().Cluster.SD.Etcd, app.server, app.dieChan) assert.NoError(t, err) typeOfetcdSD := reflect.TypeOf(etcdSD) @@ -241,13 +241,13 @@ func TestDefaultSD(t *testing.T) { func TestDefaultRPCServer(t *testing.T) { ctrl := gomock.NewController(t) - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) assert.NotNil(t, app.rpcServer) sessionPool := mocks.NewMockSessionPool(ctrl) - natsRPCServer, err := cluster.NewNatsRPCServer(*config.NewDefaultNatsRPCServerConfig(), app.server, nil, app.dieChan, sessionPool) + natsRPCServer, err := cluster.NewNatsRPCServer(config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats, app.server, nil, app.dieChan, sessionPool) assert.NoError(t, err) typeOfNatsRPCServer := reflect.TypeOf(natsRPCServer) @@ -255,11 +255,11 @@ func TestDefaultRPCServer(t *testing.T) { } func TestDefaultRPCClient(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) assert.NotNil(t, app.rpcClient) - natsRPCClient, err := cluster.NewNatsRPCClient(*config.NewDefaultNatsRPCClientConfig(), app.server, nil, app.dieChan) + natsRPCClient, err := cluster.NewNatsRPCClient(config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats, app.server, nil, app.dieChan) assert.NoError(t, err) typeOfNatsRPCClient := reflect.TypeOf(natsRPCClient) @@ -267,7 +267,7 @@ func TestDefaultRPCClient(t *testing.T) { } func TestStartAndListenStandalone(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() acc := acceptor.NewTCPAcceptor("0.0.0.0:0") builder := NewDefaultBuilder(true, "testtype", Standalone, map[string]string{}, *builderConfig) @@ -299,24 +299,24 @@ func TestStartAndListenCluster(t *testing.T) { ns := helpers.GetTestNatsServer(t) nsAddr := ns.Addr().String() - builder := NewDefaultBuilder(true, "testtype", Cluster, map[string]string{}, *config.NewDefaultBuilderConfig()) + builder := NewDefaultBuilder(true, "testtype", Cluster, map[string]string{}, *config.NewDefaultPitayaConfig()) var err error - natsClientConfig := *config.NewDefaultNatsRPCClientConfig() + natsClientConfig := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats natsClientConfig.Connect = fmt.Sprintf("nats://%s", nsAddr) builder.RPCClient, err = cluster.NewNatsRPCClient(natsClientConfig, builder.Server, builder.MetricsReporters, builder.DieChan) if err != nil { panic(err.Error()) } - natsServerConfig := *config.NewDefaultNatsRPCServerConfig() + natsServerConfig := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats natsServerConfig.Connect = fmt.Sprintf("nats://%s", nsAddr) builder.RPCServer, err = cluster.NewNatsRPCServer(natsServerConfig, builder.Server, builder.MetricsReporters, builder.DieChan, builder.SessionPool) if err != nil { panic(err.Error()) } - etcdSD, err := cluster.NewEtcdServiceDiscovery(*config.NewDefaultEtcdServiceDiscoveryConfig(), builder.Server, builder.DieChan, cli) + etcdSD, err := cluster.NewEtcdServiceDiscovery(config.NewDefaultPitayaConfig().Cluster.SD.Etcd, builder.Server, builder.DieChan, cli) builder.ServiceDiscovery = etcdSD assert.NoError(t, err) acc := acceptor.NewTCPAcceptor("0.0.0.0:0") @@ -375,7 +375,7 @@ func TestGetSessionFromCtx(t *testing.T) { ctrl := gomock.NewController(t) ss := mocks.NewMockSession(ctrl) - app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *config.NewDefaultBuilderConfig()) + app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *config.NewDefaultPitayaConfig()) ctx := context.WithValue(context.Background(), constants.SessionCtxKey, ss) s := app.GetSessionFromCtx(ctx) assert.Equal(t, ss, s) @@ -424,7 +424,7 @@ func TestDescriptor(t *testing.T) { } func TestDocumentation(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) app.startupComponents() doc, err := app.Documentation(false) @@ -484,7 +484,7 @@ func TestDocumentation(t *testing.T) { } func TestDocumentationTrue(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) app.startupComponents() doc, err := app.Documentation(true) @@ -580,7 +580,7 @@ func TestAddGRPCInfoToMetadata(t *testing.T) { } func TestStartWorker(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig).(*App) app.StartWorker() @@ -589,7 +589,7 @@ func TestStartWorker(t *testing.T) { func TestRegisterRPCJob(t *testing.T) { t.Run("register_once", func(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig) app.StartWorker() @@ -598,7 +598,7 @@ func TestRegisterRPCJob(t *testing.T) { }) t.Run("register_twice", func(t *testing.T) { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *builderConfig) app.StartWorker() diff --git a/builder.go b/builder.go index eab3e039..b9189ff8 100644 --- a/builder.go +++ b/builder.go @@ -26,7 +26,7 @@ import ( type Builder struct { acceptors []acceptor.Acceptor postBuildHooks []func(app Pitaya) - Config config.BuilderConfig + Config config.PitayaConfig DieChan chan bool PacketDecoder codec.PacketDecoder PacketEncoder codec.PacketEncoder @@ -62,61 +62,29 @@ func NewBuilderWithConfigs( serverMetadata map[string]string, conf *config.Config, ) *Builder { - builderConfig := config.NewBuilderConfig(conf) + pitayaConfig := config.NewPitayaConfig(conf) customMetrics := config.NewCustomMetricsSpec(conf) - prometheusConfig := config.NewPrometheusConfig(conf) - statsdConfig := config.NewStatsdConfig(conf) - etcdSDConfig := config.NewEtcdServiceDiscoveryConfig(conf) - natsRPCServerConfig := config.NewNatsRPCServerConfig(conf) - natsRPCClientConfig := config.NewNatsRPCClientConfig(conf) - workerConfig := config.NewWorkerConfig(conf) - enqueueOpts := config.NewEnqueueOpts(conf) - groupServiceConfig := config.NewMemoryGroupConfig(conf) return NewBuilder( isFrontend, serverType, serverMode, serverMetadata, - *builderConfig, + *pitayaConfig, *customMetrics, - *prometheusConfig, - *statsdConfig, - *etcdSDConfig, - *natsRPCServerConfig, - *natsRPCClientConfig, - *workerConfig, - *enqueueOpts, - *groupServiceConfig, ) } // NewDefaultBuilder return a builder instance with default dependency instances for a pitaya App, // with default configs -func NewDefaultBuilder(isFrontend bool, serverType string, serverMode ServerMode, serverMetadata map[string]string, builderConfig config.BuilderConfig) *Builder { +func NewDefaultBuilder(isFrontend bool, serverType string, serverMode ServerMode, serverMetadata map[string]string, pitayaConfig config.PitayaConfig) *Builder { customMetrics := config.NewDefaultCustomMetricsSpec() - prometheusConfig := config.NewDefaultPrometheusConfig() - statsdConfig := config.NewDefaultStatsdConfig() - etcdSDConfig := config.NewDefaultEtcdServiceDiscoveryConfig() - natsRPCServerConfig := config.NewDefaultNatsRPCServerConfig() - natsRPCClientConfig := config.NewDefaultNatsRPCClientConfig() - workerConfig := config.NewDefaultWorkerConfig() - enqueueOpts := config.NewDefaultEnqueueOpts() - groupServiceConfig := config.NewDefaultMemoryGroupConfig() return NewBuilder( isFrontend, serverType, serverMode, serverMetadata, - builderConfig, + pitayaConfig, *customMetrics, - *prometheusConfig, - *statsdConfig, - *etcdSDConfig, - *natsRPCServerConfig, - *natsRPCClientConfig, - *workerConfig, - *enqueueOpts, - *groupServiceConfig, ) } @@ -126,27 +94,19 @@ func NewBuilder(isFrontend bool, serverType string, serverMode ServerMode, serverMetadata map[string]string, - config config.BuilderConfig, + config config.PitayaConfig, customMetrics models.CustomMetricsSpec, - prometheusConfig config.PrometheusConfig, - statsdConfig config.StatsdConfig, - etcdSDConfig config.EtcdServiceDiscoveryConfig, - natsRPCServerConfig config.NatsRPCServerConfig, - natsRPCClientConfig config.NatsRPCClientConfig, - workerConfig config.WorkerConfig, - enqueueOpts config.EnqueueOpts, - groupServiceConfig config.MemoryGroupConfig, ) *Builder { server := cluster.NewServer(uuid.New().String(), serverType, isFrontend, serverMetadata) dieChan := make(chan bool) metricsReporters := []metrics.Reporter{} if config.Metrics.Prometheus.Enabled { - metricsReporters = addDefaultPrometheus(prometheusConfig, customMetrics, metricsReporters, serverType) + metricsReporters = addDefaultPrometheus(config.Metrics, customMetrics, metricsReporters, serverType) } if config.Metrics.Statsd.Enabled { - metricsReporters = addDefaultStatsd(statsdConfig, metricsReporters, serverType) + metricsReporters = addDefaultStatsd(config.Metrics, metricsReporters, serverType) } handlerHooks := pipeline.NewHandlerHooks() @@ -161,28 +121,28 @@ func NewBuilder(isFrontend bool, var rpcClient cluster.RPCClient if serverMode == Cluster { var err error - serviceDiscovery, err = cluster.NewEtcdServiceDiscovery(etcdSDConfig, server, dieChan) + serviceDiscovery, err = cluster.NewEtcdServiceDiscovery(config.Cluster.SD.Etcd, server, dieChan) if err != nil { logger.Log.Fatalf("error creating default cluster service discovery component: %s", err.Error()) } - rpcServer, err = cluster.NewNatsRPCServer(natsRPCServerConfig, server, metricsReporters, dieChan, sessionPool) + rpcServer, err = cluster.NewNatsRPCServer(config.Cluster.RPC.Server.Nats, server, metricsReporters, dieChan, sessionPool) if err != nil { logger.Log.Fatalf("error setting default cluster rpc server component: %s", err.Error()) } - rpcClient, err = cluster.NewNatsRPCClient(natsRPCClientConfig, server, metricsReporters, dieChan) + rpcClient, err = cluster.NewNatsRPCClient(config.Cluster.RPC.Client.Nats, server, metricsReporters, dieChan) if err != nil { logger.Log.Fatalf("error setting default cluster rpc client component: %s", err.Error()) } } - worker, err := worker.NewWorker(workerConfig, enqueueOpts) + worker, err := worker.NewWorker(config.Worker, config.Worker.Retry) if err != nil { logger.Log.Fatalf("error creating default worker: %s", err.Error()) } - gsi := groups.NewMemoryGroupService(groupServiceConfig) + gsi := groups.NewMemoryGroupService(config.Groups.Memory) if err != nil { panic(err) } @@ -194,7 +154,7 @@ func NewBuilder(isFrontend bool, DieChan: dieChan, PacketDecoder: codec.NewPomeloPacketDecoder(), PacketEncoder: codec.NewPomeloPacketEncoder(), - MessageEncoder: message.NewMessagesEncoder(config.Pitaya.Handler.Messages.Compression), + MessageEncoder: message.NewMessagesEncoder(config.Handler.Messages.Compression), Serializer: json.NewSerializer(), Router: router.New(), RPCClient: rpcClient, @@ -262,9 +222,9 @@ func (builder *Builder) Build() Pitaya { builder.PacketDecoder, builder.PacketEncoder, builder.Serializer, - builder.Config.Pitaya.Heartbeat.Interval, + builder.Config.Heartbeat.Interval, builder.MessageEncoder, - builder.Config.Pitaya.Buffer.Agent.Messages, + builder.Config.Buffer.Agent.Messages, builder.SessionPool, builder.MetricsReporters, ) @@ -272,8 +232,8 @@ func (builder *Builder) Build() Pitaya { handlerService := service.NewHandlerService( builder.PacketDecoder, builder.Serializer, - builder.Config.Pitaya.Buffer.Handler.LocalProcess, - builder.Config.Pitaya.Buffer.Handler.RemoteProcess, + builder.Config.Buffer.Handler.LocalProcess, + builder.Config.Buffer.Handler.RemoteProcess, builder.Server, remoteService, agentFactory, @@ -298,7 +258,7 @@ func (builder *Builder) Build() Pitaya { builder.Groups, builder.SessionPool, builder.MetricsReporters, - builder.Config.Pitaya, + builder.Config, ) for _, postBuildHook := range builder.postBuildHooks { @@ -309,7 +269,7 @@ func (builder *Builder) Build() Pitaya { } // NewDefaultApp returns a default pitaya app instance -func NewDefaultApp(isFrontend bool, serverType string, serverMode ServerMode, serverMetadata map[string]string, config config.BuilderConfig) Pitaya { +func NewDefaultApp(isFrontend bool, serverType string, serverMode ServerMode, serverMetadata map[string]string, config config.PitayaConfig) Pitaya { builder := NewDefaultBuilder(isFrontend, serverType, serverMode, serverMetadata, config) return builder.Build() } @@ -318,7 +278,7 @@ func configureDefaultPipelines(handlerHooks *pipeline.HandlerHooks) { handlerHooks.BeforeHandler.PushBack(defaultpipelines.StructValidatorInstance.Validate) } -func addDefaultPrometheus(config config.PrometheusConfig, customMetrics models.CustomMetricsSpec, reporters []metrics.Reporter, serverType string) []metrics.Reporter { +func addDefaultPrometheus(config config.MetricsConfig, customMetrics models.CustomMetricsSpec, reporters []metrics.Reporter, serverType string) []metrics.Reporter { prometheus, err := CreatePrometheusReporter(serverType, config, customMetrics) if err != nil { logger.Log.Errorf("failed to start prometheus metrics reporter, skipping %v", err) @@ -328,7 +288,7 @@ func addDefaultPrometheus(config config.PrometheusConfig, customMetrics models.C return reporters } -func addDefaultStatsd(config config.StatsdConfig, reporters []metrics.Reporter, serverType string) []metrics.Reporter { +func addDefaultStatsd(config config.MetricsConfig, reporters []metrics.Reporter, serverType string) []metrics.Reporter { statsd, err := CreateStatsdReporter(serverType, config) if err != nil { logger.Log.Errorf("failed to start statsd metrics reporter, skipping %v", err) diff --git a/builder_test.go b/builder_test.go index da07c097..fd5d639d 100644 --- a/builder_test.go +++ b/builder_test.go @@ -21,16 +21,17 @@ package pitaya import ( + "testing" + "github.com/stretchr/testify/assert" "github.com/topfreegames/pitaya/v2/acceptor" "github.com/topfreegames/pitaya/v2/config" - "testing" ) func TestPostBuildHooks(t *testing.T) { acc := acceptor.NewTCPAcceptor("0.0.0.0:0") for _, table := range tables { - builderConfig := config.NewDefaultBuilderConfig() + builderConfig := config.NewDefaultPitayaConfig() t.Run("with_post_build_hooks", func(t *testing.T) { called := false diff --git a/cluster/etcd_service_discovery_test.go b/cluster/etcd_service_discovery_test.go index 630efbe3..5259b64c 100644 --- a/cluster/etcd_service_discovery_test.go +++ b/cluster/etcd_service_discovery_test.go @@ -25,11 +25,12 @@ import ( "math" "testing" "time" - clientv3 "go.etcd.io/etcd/client/v3" + "github.com/stretchr/testify/assert" "github.com/topfreegames/pitaya/v2/config" "github.com/topfreegames/pitaya/v2/constants" "github.com/topfreegames/pitaya/v2/helpers" + clientv3 "go.etcd.io/etcd/client/v3" ) var etcdSDTables = []struct { @@ -105,10 +106,10 @@ func TestNewEtcdServiceDiscovery(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) assert.NotNil(t, e) }) } @@ -118,10 +119,10 @@ func TestEtcdSDBootstrapLease(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) err := e.grantLease() assert.NoError(t, err) assert.NotEmpty(t, e.leaseID) @@ -133,10 +134,10 @@ func TestEtcdSDBootstrapLeaseError(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) err := e.grantLease() assert.Error(t, err) }) @@ -147,10 +148,10 @@ func TestEtcdSDBootstrapServer(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) e.Init() err := e.bootstrapServer(table.server) assert.NoError(t, err) @@ -172,10 +173,10 @@ func TestEtcdSDDeleteServer(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) e.Init() err := e.bootstrapServer(table.server) assert.NoError(t, err) @@ -212,10 +213,10 @@ func TestEtcdSDDeleteLocalInvalidServers(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) invalidServer := &Server{ ID: "invalid", Type: "bla", @@ -233,10 +234,10 @@ func TestEtcdSDGetServer(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) e.Init() e.bootstrapServer(table.server) sv, err := e.GetServer(table.server.ID) @@ -249,10 +250,10 @@ func TestEtcdSDGetServer(t *testing.T) { func TestEtcdSDGetServers(t *testing.T) { t.Parallel() for _, table := range etcdSDTablesMultipleServers { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, &Server{}, cli) + e := getEtcdSD(t, config, &Server{}, cli) e.Init() for _, server := range table.servers { e.bootstrapServer(server) @@ -268,11 +269,11 @@ func TestEtcdSDInit(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd config.SyncServers.Interval = time.Duration(300 * time.Millisecond) c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) e.Init() // should set running assert.True(t, e.running) @@ -296,10 +297,10 @@ func TestEtcdBeforeShutdown(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) e.Init() assert.True(t, e.running) e.BeforeShutdown() @@ -314,10 +315,10 @@ func TestEtcdShutdown(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) e.Init() assert.True(t, e.running) e.Shutdown() @@ -330,11 +331,11 @@ func TestEtcdWatchChangesAddNewServers(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd config.SyncServers.Interval = time.Duration(100 * time.Millisecond) c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) e.Init() e.running = true serversBefore, err := e.GetServersByType(table.server.Type) @@ -362,11 +363,11 @@ func TestEtcdWatchChangesDeleteServers(t *testing.T) { t.Parallel() for _, table := range etcdSDTables { t.Run(table.server.ID, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd config.SyncServers.Interval = 100 * time.Millisecond c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) e.Init() e.running = true serversBefore, err := e.GetServersByType(table.server.Type) @@ -400,12 +401,12 @@ func TestEtcdWatchChangesWithBlacklist(t *testing.T) { t.Parallel() for _, table := range etcdSDBlacklistTables { t.Run(table.name, func(t *testing.T) { - config := config.NewDefaultEtcdServiceDiscoveryConfig() + config := config.NewDefaultPitayaConfig().Cluster.SD.Etcd config.SyncServers.Interval = 100 * time.Millisecond config.ServerTypesBlacklist = table.serverTypeBlacklist c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) - e := getEtcdSD(t, *config, table.server, cli) + e := getEtcdSD(t, config, table.server, cli) e.Init() e.running = true diff --git a/cluster/grpc_rpc_client_test.go b/cluster/grpc_rpc_client_test.go index 269a981b..e899ff46 100644 --- a/cluster/grpc_rpc_client_test.go +++ b/cluster/grpc_rpc_client_test.go @@ -27,15 +27,15 @@ func getRPCClient(c config.GRPCClientConfig) (*GRPCClient, error) { } func TestNewGRPCClient(t *testing.T) { - c := config.NewDefaultGRPCClientConfig() - g, err := getRPCClient(*c) + c := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Grpc + g, err := getRPCClient(c) assert.NoError(t, err) assert.NotNil(t, g) } func TestCall(t *testing.T) { - c := config.NewDefaultGRPCClientConfig() - g, err := getRPCClient(*c) + c := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Grpc + g, err := getRPCClient(c) assert.NoError(t, err) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -62,7 +62,7 @@ func TestCall(t *testing.T) { sess.EXPECT().ID().Return(int64(1)).Times(2) sess.EXPECT().UID().Return(uid).Times(2) sess.EXPECT().GetDataEncoded().Return(nil).Times(2) - sess.EXPECT().SetRequestInFlight(gomock.Any(),gomock.Any(),gomock.Any()).Times(2) + sess.EXPECT().SetRequestInFlight(gomock.Any(), gomock.Any(), gomock.Any()).Times(2) expected, err := buildRequest(ctx, rpcType, r, sess, msg, g.server) assert.NoError(t, err) @@ -96,8 +96,8 @@ func TestBroadcastSessionBind(t *testing.T) { for _, table := range tables { t.Run(table.name, func(t *testing.T) { - c := config.NewDefaultGRPCClientConfig() - g, err := getRPCClient(*c) + c := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Grpc + g, err := getRPCClient(c) assert.NoError(t, err) uid := "someuid" //mockPitayaClient := protosmocks.NewMockPitayaClient(ctrl) @@ -156,8 +156,8 @@ func TestSendKick(t *testing.T) { for _, table := range tables { t.Run(table.name, func(t *testing.T) { - c := config.NewDefaultGRPCClientConfig() - g, err := getRPCClient(*c) + c := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Grpc + g, err := getRPCClient(c) assert.NoError(t, err) if table.bindingStorage != nil { @@ -216,7 +216,7 @@ func TestSendPush(t *testing.T) { for _, table := range tables { t.Run(table.name, func(t *testing.T) { - g, err := getRPCClient(*config.NewDefaultGRPCClientConfig()) + g, err := getRPCClient(config.NewDefaultPitayaConfig().Cluster.RPC.Client.Grpc) assert.NoError(t, err) uid := "someuid" @@ -263,9 +263,9 @@ func TestSendPush(t *testing.T) { func TestAddServer(t *testing.T) { t.Run("try-connect", func(t *testing.T) { // listen - clientConfig := config.NewDefaultGRPCClientConfig() + clientConfig := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Grpc - serverConfig := config.NewDefaultGRPCServerConfig() + serverConfig := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Grpc serverConfig.Port = helpers.GetFreePort(t) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -279,7 +279,7 @@ func TestAddServer(t *testing.T) { }, Frontend: false, } - gs, err := NewGRPCServer(*serverConfig, server, []metrics.Reporter{}) + gs, err := NewGRPCServer(serverConfig, server, []metrics.Reporter{}) assert.NoError(t, err) mockPitayaServer := protosmocks.NewMockPitayaServer(ctrl) @@ -288,7 +288,7 @@ func TestAddServer(t *testing.T) { err = gs.Init() assert.NoError(t, err) // --- should connect to the server and add it to the client map - g, err := getRPCClient(*clientConfig) + g, err := getRPCClient(clientConfig) assert.NoError(t, err) g.AddServer(server) @@ -302,10 +302,10 @@ func TestAddServer(t *testing.T) { t.Run("lazy", func(t *testing.T) { // listen - clientConfig := config.NewDefaultGRPCClientConfig() + clientConfig := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Grpc clientConfig.LazyConnection = true - serverConfig := config.NewDefaultGRPCServerConfig() + serverConfig := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Grpc serverConfig.Port = helpers.GetFreePort(t) ctrl := gomock.NewController(t) @@ -320,7 +320,7 @@ func TestAddServer(t *testing.T) { }, Frontend: false, } - gs, err := NewGRPCServer(*serverConfig, server, []metrics.Reporter{}) + gs, err := NewGRPCServer(serverConfig, server, []metrics.Reporter{}) assert.NoError(t, err) mockPitayaServer := protosmocks.NewMockPitayaServer(ctrl) @@ -328,7 +328,7 @@ func TestAddServer(t *testing.T) { err = gs.Init() assert.NoError(t, err) - g, err := getRPCClient(*clientConfig) + g, err := getRPCClient(clientConfig) assert.NoError(t, err) // --- should not connect to the server and add it to the client map g.AddServer(server) @@ -393,9 +393,9 @@ func TestGetServerHost(t *testing.T) { for name, table := range tables { t.Run(name, func(t *testing.T) { - config := config.NewDefaultInfoRetrieverConfig() + config := config.NewDefaultPitayaConfig().Cluster.Info config.Region = table.clientRegion - infoRetriever := NewInfoRetriever(*config) + infoRetriever := NewInfoRetriever(config) gs := &GRPCClient{infoRetriever: infoRetriever} host, portKey := gs.getServerHost(&Server{ @@ -412,9 +412,9 @@ func TestRemoveServer(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - clientConfig := config.NewDefaultGRPCClientConfig() + clientConfig := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Grpc - serverConfig := config.NewDefaultGRPCServerConfig() + serverConfig := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Grpc serverConfig.Port = helpers.GetFreePort(t) server := &Server{ @@ -426,14 +426,14 @@ func TestRemoveServer(t *testing.T) { }, Frontend: false, } - gs, err := NewGRPCServer(*serverConfig, server, []metrics.Reporter{}) + gs, err := NewGRPCServer(serverConfig, server, []metrics.Reporter{}) assert.NoError(t, err) mockPitayaServer := protosmocks.NewMockPitayaServer(ctrl) gs.SetPitayaServer(mockPitayaServer) err = gs.Init() assert.NoError(t, err) - gc, err := NewGRPCClient(*clientConfig, server, []metrics.Reporter{}, nil, nil) + gc, err := NewGRPCClient(clientConfig, server, []metrics.Reporter{}, nil, nil) assert.NoError(t, err) gc.AddServer(server) diff --git a/cluster/grpc_rpc_server_test.go b/cluster/grpc_rpc_server_test.go index 556a0bd6..02fb5cb5 100644 --- a/cluster/grpc_rpc_server_test.go +++ b/cluster/grpc_rpc_server_test.go @@ -17,21 +17,21 @@ import ( func TestNewGRPCServer(t *testing.T) { t.Parallel() sv := getServer() - gs, err := NewGRPCServer(*config.NewDefaultGRPCServerConfig(), sv, []metrics.Reporter{}) + gs, err := NewGRPCServer(config.NewDefaultPitayaConfig().Cluster.RPC.Server.Grpc, sv, []metrics.Reporter{}) assert.NoError(t, err) assert.NotNil(t, gs) } func TestGRPCServerInit(t *testing.T) { t.Parallel() - c := config.NewDefaultGRPCServerConfig() + c := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Grpc c.Port = helpers.GetFreePort(t) ctrl := gomock.NewController(t) defer ctrl.Finish() mockPitayaServer := protosmocks.NewMockPitayaServer(ctrl) sv := getServer() - gs, err := NewGRPCServer(*c, sv, []metrics.Reporter{}) + gs, err := NewGRPCServer(c, sv, []metrics.Reporter{}) gs.SetPitayaServer(mockPitayaServer) err = gs.Init() assert.NoError(t, err) diff --git a/cluster/info_retriever_test.go b/cluster/info_retriever_test.go index 90ce40e2..60b3830e 100644 --- a/cluster/info_retriever_test.go +++ b/cluster/info_retriever_test.go @@ -15,7 +15,7 @@ func TestInfoRetrieverRegion(t *testing.T) { c.Set("pitaya.cluster.info.region", "us") conf := config.NewConfig(c) - infoRetriever := NewInfoRetriever(*config.NewInfoRetrieverConfig(conf)) + infoRetriever := NewInfoRetriever(*&config.NewPitayaConfig(conf).Cluster.Info) assert.Equal(t, "us", infoRetriever.Region()) } diff --git a/cluster/nats_rpc_client_test.go b/cluster/nats_rpc_client_test.go index 2923a04c..19eedb8e 100644 --- a/cluster/nats_rpc_client_test.go +++ b/cluster/nats_rpc_client_test.go @@ -51,9 +51,9 @@ func TestNewNatsRPCClient(t *testing.T) { mockMetricsReporter := metricsmocks.NewMockReporter(ctrl) mockMetricsReporters := []metrics.Reporter{mockMetricsReporter} - cfg := config.NewDefaultNatsRPCClientConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats sv := getServer() - n, err := NewNatsRPCClient(*cfg, sv, mockMetricsReporters, nil) + n, err := NewNatsRPCClient(cfg, sv, mockMetricsReporters, nil) assert.NoError(t, err) assert.NotNil(t, n) assert.Equal(t, sv, n.server) @@ -75,10 +75,10 @@ func TestNatsRPCClientConfigure(t *testing.T) { for _, table := range tables { t.Run(fmt.Sprintf("%s-%s", table.natsConnect, table.reqTimeout), func(t *testing.T) { - cfg := config.NewDefaultNatsRPCClientConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats cfg.Connect = table.natsConnect cfg.RequestTimeout = table.reqTimeout - _, err := NewNatsRPCClient(*cfg, getServer(), nil, nil) + _, err := NewNatsRPCClient(cfg, getServer(), nil, nil) assert.Equal(t, table.err, err) }) } @@ -86,17 +86,17 @@ func TestNatsRPCClientConfigure(t *testing.T) { func TestNatsRPCClientGetSubscribeChannel(t *testing.T) { t.Parallel() - cfg := config.NewDefaultNatsRPCClientConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats sv := getServer() - n, _ := NewNatsRPCClient(*cfg, sv, nil, nil) + n, _ := NewNatsRPCClient(cfg, sv, nil, nil) assert.Equal(t, fmt.Sprintf("pitaya/servers/%s/%s", n.server.Type, n.server.ID), n.getSubscribeChannel()) } func TestNatsRPCClientStop(t *testing.T) { t.Parallel() - cfg := config.NewDefaultNatsRPCClientConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats sv := getServer() - n, _ := NewNatsRPCClient(*cfg, sv, nil, nil) + n, _ := NewNatsRPCClient(cfg, sv, nil, nil) // change it to true to ensure it goes to false n.running = true n.stop() @@ -106,9 +106,9 @@ func TestNatsRPCClientStop(t *testing.T) { func TestNatsRPCClientInitShouldFailIfConnFails(t *testing.T) { t.Parallel() sv := getServer() - cfg := config.NewDefaultNatsRPCClientConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats cfg.Connect = "nats://localhost:1" - rpcClient, _ := NewNatsRPCClient(*cfg, sv, nil, nil) + rpcClient, _ := NewNatsRPCClient(cfg, sv, nil, nil) err := rpcClient.Init() assert.Error(t, err) } @@ -116,11 +116,11 @@ func TestNatsRPCClientInitShouldFailIfConnFails(t *testing.T) { func TestNatsRPCClientInit(t *testing.T) { s := helpers.GetTestNatsServer(t) defer s.Shutdown() - cfg := config.NewDefaultNatsRPCClientConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats cfg.Connect = fmt.Sprintf("nats://%s", s.Addr()) sv := getServer() - rpcClient, _ := NewNatsRPCClient(*cfg, sv, nil, nil) + rpcClient, _ := NewNatsRPCClient(cfg, sv, nil, nil) err := rpcClient.Init() assert.NoError(t, err) assert.True(t, rpcClient.running) @@ -134,11 +134,11 @@ func TestNatsRPCClientBroadcastSessionBind(t *testing.T) { uid := "testuid123" s := helpers.GetTestNatsServer(t) defer s.Shutdown() - cfg := config.NewDefaultNatsRPCClientConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats cfg.Connect = fmt.Sprintf("nats://%s", s.Addr()) sv := getServer() - rpcClient, _ := NewNatsRPCClient(*cfg, sv, nil, nil) + rpcClient, _ := NewNatsRPCClient(cfg, sv, nil, nil) rpcClient.Init() subChan := make(chan *nats.Msg) @@ -166,11 +166,11 @@ func TestNatsRPCClientSendKick(t *testing.T) { uid := "testuid" s := helpers.GetTestNatsServer(t) defer s.Shutdown() - cfg := config.NewDefaultNatsRPCClientConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats cfg.Connect = fmt.Sprintf("nats://%s", s.Addr()) sv := getServer() - rpcClient, _ := NewNatsRPCClient(*cfg, sv, nil, nil) + rpcClient, _ := NewNatsRPCClient(cfg, sv, nil, nil) err := rpcClient.Init() assert.NoError(t, err) @@ -201,11 +201,11 @@ func TestNatsRPCClientSendPush(t *testing.T) { uid := "testuid123" s := helpers.GetTestNatsServer(t) defer s.Shutdown() - cfg := config.NewDefaultNatsRPCClientConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats cfg.Connect = fmt.Sprintf("nats://%s", s.Addr()) sv := getServer() - rpcClient, _ := NewNatsRPCClient(*cfg, sv, nil, nil) + rpcClient, _ := NewNatsRPCClient(cfg, sv, nil, nil) rpcClient.Init() subChan := make(chan *nats.Msg) @@ -238,9 +238,9 @@ func TestNatsRPCClientSendPush(t *testing.T) { } func TestNatsRPCClientSendShouldFailIfNotRunning(t *testing.T) { - config := config.NewDefaultNatsRPCClientConfig() + config := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats sv := getServer() - rpcClient, _ := NewNatsRPCClient(*config, sv, nil, nil) + rpcClient, _ := NewNatsRPCClient(config, sv, nil, nil) err := rpcClient.Send("topic", []byte("data")) assert.Equal(t, constants.ErrRPCClientNotInitialized, err) } @@ -248,11 +248,11 @@ func TestNatsRPCClientSendShouldFailIfNotRunning(t *testing.T) { func TestNatsRPCClientSend(t *testing.T) { s := helpers.GetTestNatsServer(t) defer s.Shutdown() - cfg := config.NewDefaultNatsRPCClientConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats cfg.Connect = fmt.Sprintf("nats://%s", s.Addr()) sv := getServer() - rpcClient, _ := NewNatsRPCClient(*cfg, sv, nil, nil) + rpcClient, _ := NewNatsRPCClient(cfg, sv, nil, nil) rpcClient.Init() tables := []struct { @@ -282,9 +282,9 @@ func TestNatsRPCClientSend(t *testing.T) { } func TestNatsRPCClientBuildRequest(t *testing.T) { - config := config.NewDefaultNatsRPCClientConfig() + config := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats sv := getServer() - rpcClient, _ := NewNatsRPCClient(*config, sv, nil, nil) + rpcClient, _ := NewNatsRPCClient(config, sv, nil, nil) rt := route.NewRoute("sv", "svc", "method") @@ -394,9 +394,9 @@ func TestNatsRPCClientBuildRequest(t *testing.T) { } func TestNatsRPCClientCallShouldFailIfNotRunning(t *testing.T) { - config := config.NewDefaultNatsRPCClientConfig() + config := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats sv := getServer() - rpcClient, _ := NewNatsRPCClient(*config, sv, nil, nil) + rpcClient, _ := NewNatsRPCClient(config, sv, nil, nil) res, err := rpcClient.Call(context.Background(), protos.RPCType_Sys, nil, nil, nil, sv) assert.Equal(t, constants.ErrRPCClientNotInitialized, err) assert.Nil(t, res) @@ -406,10 +406,10 @@ func TestNatsRPCClientCall(t *testing.T) { s := helpers.GetTestNatsServer(t) sv := getServer() defer s.Shutdown() - cfg := config.NewDefaultNatsRPCClientConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Client.Nats cfg.Connect = fmt.Sprintf("nats://%s", s.Addr()) cfg.RequestTimeout = time.Duration(300 * time.Millisecond) - rpcClient, _ := NewNatsRPCClient(*cfg, sv, nil, nil) + rpcClient, _ := NewNatsRPCClient(cfg, sv, nil, nil) rpcClient.Init() rt := route.NewRoute("sv", "svc", "method") diff --git a/cluster/nats_rpc_server_test.go b/cluster/nats_rpc_server_test.go index a2d84768..2661ada6 100644 --- a/cluster/nats_rpc_server_test.go +++ b/cluster/nats_rpc_server_test.go @@ -77,9 +77,9 @@ func TestNewNatsRPCServer(t *testing.T) { mockMetricsReporters := []metrics.Reporter{mockMetricsReporter} mockSessionPool := sessionmocks.NewMockSessionPool(ctrl) - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() - n, err := NewNatsRPCServer(*cfg, sv, mockMetricsReporters, nil, mockSessionPool) + n, err := NewNatsRPCServer(cfg, sv, mockMetricsReporters, nil, mockSessionPool) assert.NoError(t, err) assert.NotNil(t, n) assert.Equal(t, sv, n.server) @@ -102,11 +102,11 @@ func TestNatsRPCServerConfigure(t *testing.T) { for _, table := range tables { t.Run(fmt.Sprintf("%s-%d-%d", table.natsConnect, table.messagesBufferSize, table.pushBufferSize), func(t *testing.T) { - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats cfg.Connect = table.natsConnect cfg.Buffer.Messages = table.messagesBufferSize cfg.Buffer.Push = table.pushBufferSize - _, err := NewNatsRPCServer(*cfg, getServer(), nil, nil, nil) + _, err := NewNatsRPCServer(cfg, getServer(), nil, nil, nil) assert.Equal(t, table.err, err) }) } @@ -129,24 +129,24 @@ func TestNatsRPCServerGetUserKickTopic(t *testing.T) { func TestNatsRPCServerGetUnhandledRequestsChannel(t *testing.T) { t.Parallel() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() - n, _ := NewNatsRPCServer(*cfg, sv, nil, nil, nil) + n, _ := NewNatsRPCServer(cfg, sv, nil, nil, nil) assert.NotNil(t, n.GetUnhandledRequestsChannel()) assert.IsType(t, make(chan *protos.Request), n.GetUnhandledRequestsChannel()) } func TestNatsRPCServerGetBindingsChannel(t *testing.T) { t.Parallel() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() - n, _ := NewNatsRPCServer(*cfg, sv, nil, nil, nil) + n, _ := NewNatsRPCServer(cfg, sv, nil, nil, nil) assert.Equal(t, n.bindingsChan, n.GetBindingsChannel()) } func TestNatsRPCServerOnSessionBind(t *testing.T) { t.Parallel() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() ctrl := gomock.NewController(t) @@ -154,7 +154,7 @@ func TestNatsRPCServerOnSessionBind(t *testing.T) { mockSession.EXPECT().UID().Return("uid").Times(2) mockSession.EXPECT().SetSubscriptions(gomock.Len(2)).Times(1) - rpcServer, _ := NewNatsRPCServer(*cfg, sv, nil, nil, nil) + rpcServer, _ := NewNatsRPCServer(cfg, sv, nil, nil, nil) s := helpers.GetTestNatsServer(t) defer s.Shutdown() conn, err := setupNatsConn(fmt.Sprintf("nats://%s", s.Addr()), nil) @@ -167,9 +167,9 @@ func TestNatsRPCServerOnSessionBind(t *testing.T) { func TestNatsRPCServerSubscribeToBindingsChannel(t *testing.T) { t.Parallel() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() - rpcServer, _ := NewNatsRPCServer(*cfg, sv, nil, nil, nil) + rpcServer, _ := NewNatsRPCServer(cfg, sv, nil, nil, nil) s := helpers.GetTestNatsServer(t) defer s.Shutdown() conn, err := setupNatsConn(fmt.Sprintf("nats://%s", s.Addr()), nil) @@ -185,9 +185,9 @@ func TestNatsRPCServerSubscribeToBindingsChannel(t *testing.T) { func TestNatsRPCServerSubscribeUserKickChannel(t *testing.T) { t.Parallel() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() - rpcServer, _ := NewNatsRPCServer(*cfg, sv, nil, nil, nil) + rpcServer, _ := NewNatsRPCServer(cfg, sv, nil, nil, nil) s := helpers.GetTestNatsServer(t) defer s.Shutdown() conn, err := setupNatsConn(fmt.Sprintf("nats://%s", s.Addr()), nil) @@ -207,26 +207,26 @@ func TestNatsRPCServerSubscribeUserKickChannel(t *testing.T) { func TestNatsRPCServerGetUserPushChannel(t *testing.T) { t.Parallel() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() - n, _ := NewNatsRPCServer(*cfg, sv, nil, nil, nil) + n, _ := NewNatsRPCServer(cfg, sv, nil, nil, nil) assert.NotNil(t, n.getUserPushChannel()) assert.IsType(t, make(chan *protos.Push), n.getUserPushChannel()) } func TestNatsRPCServerGetUserKickChannel(t *testing.T) { t.Parallel() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() - n, _ := NewNatsRPCServer(*cfg, sv, nil, nil, nil) + n, _ := NewNatsRPCServer(cfg, sv, nil, nil, nil) assert.NotNil(t, n.getUserKickChannel()) assert.IsType(t, make(chan *protos.KickMsg), n.getUserKickChannel()) } func TestNatsRPCServerSubscribeToUserMessages(t *testing.T) { - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() - rpcServer, _ := NewNatsRPCServer(*cfg, sv, nil, nil, nil) + rpcServer, _ := NewNatsRPCServer(cfg, sv, nil, nil, nil) s := helpers.GetTestNatsServer(t) defer s.Shutdown() conn, err := setupNatsConn(fmt.Sprintf("nats://%s", s.Addr()), nil) @@ -254,9 +254,9 @@ func TestNatsRPCServerSubscribeToUserMessages(t *testing.T) { } func TestNatsRPCServerSubscribe(t *testing.T) { - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() - rpcServer, _ := NewNatsRPCServer(*cfg, sv, nil, nil, nil) + rpcServer, _ := NewNatsRPCServer(cfg, sv, nil, nil, nil) s := helpers.GetTestNatsServer(t) defer s.Shutdown() conn, err := setupNatsConn(fmt.Sprintf("nats://%s", s.Addr()), nil) @@ -284,14 +284,14 @@ func TestNatsRPCServerSubscribe(t *testing.T) { } func TestNatsRPCServerHandleMessages(t *testing.T) { - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() ctrl := gomock.NewController(t) defer ctrl.Finish() mockMetricsReporter := metricsmocks.NewMockReporter(ctrl) mockMetricsReporters := []metrics.Reporter{mockMetricsReporter} - rpcServer, _ := NewNatsRPCServer(*cfg, sv, mockMetricsReporters, nil, nil) + rpcServer, _ := NewNatsRPCServer(cfg, sv, mockMetricsReporters, nil, nil) s := helpers.GetTestNatsServer(t) defer s.Shutdown() conn, err := setupNatsConn(fmt.Sprintf("nats://%s", s.Addr()), nil) @@ -328,13 +328,13 @@ func TestNatsRPCServerHandleMessages(t *testing.T) { func TestNatsRPCServerInitShouldFailIfConnFails(t *testing.T) { t.Parallel() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats cfg.Connect = "nats://localhost:1" sv := getServer() ctrl := gomock.NewController(t) mockSessionPool := sessionmocks.NewMockSessionPool(ctrl) - rpcServer, _ := NewNatsRPCServer(*cfg, sv, nil, nil, mockSessionPool) + rpcServer, _ := NewNatsRPCServer(cfg, sv, nil, nil, mockSessionPool) //mockSessionPool.EXPECT().OnSessionBind(rpcServer.onSessionBind) err := rpcServer.Init() assert.Error(t, err) @@ -344,12 +344,12 @@ func TestNatsRPCServerInit(t *testing.T) { s := helpers.GetTestNatsServer(t) ctrl := gomock.NewController(t) defer s.Shutdown() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats cfg.Connect = fmt.Sprintf("nats://%s", s.Addr()) sv := getServer() mockSessionPool := sessionmocks.NewMockSessionPool(ctrl) - rpcServer, _ := NewNatsRPCServer(*cfg, sv, nil, nil, mockSessionPool) + rpcServer, _ := NewNatsRPCServer(cfg, sv, nil, nil, mockSessionPool) mockSessionPool.EXPECT().OnSessionBind(newFuncPtrMatcher(rpcServer.onSessionBind)) err := rpcServer.Init() assert.NoError(t, err) @@ -382,11 +382,11 @@ func TestNatsRPCServerProcessBindings(t *testing.T) { ctrl := gomock.NewController(t) s := helpers.GetTestNatsServer(t) defer s.Shutdown() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats cfg.Connect = fmt.Sprintf("nats://%s", s.Addr()) sv := getServer() mockSessionPool := sessionmocks.NewMockSessionPool(ctrl) - rpcServer, _ := NewNatsRPCServer(*cfg, sv, nil, nil, mockSessionPool) + rpcServer, _ := NewNatsRPCServer(cfg, sv, nil, nil, mockSessionPool) mockSessionPool.EXPECT().OnSessionBind(newFuncPtrMatcher(rpcServer.onSessionBind)) err := rpcServer.Init() @@ -425,11 +425,11 @@ func TestNatsRPCServerProcessPushes(t *testing.T) { s := helpers.GetTestNatsServer(t) ctrl := gomock.NewController(t) defer s.Shutdown() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats cfg.Connect = fmt.Sprintf("nats://%s", s.Addr()) sv := getServer() mockSessionPool := sessionmocks.NewMockSessionPool(ctrl) - rpcServer, _ := NewNatsRPCServer(*cfg, sv, nil, nil, mockSessionPool) + rpcServer, _ := NewNatsRPCServer(cfg, sv, nil, nil, mockSessionPool) mockSessionPool.EXPECT().OnSessionBind(newFuncPtrMatcher(rpcServer.onSessionBind)) err := rpcServer.Init() @@ -460,11 +460,11 @@ func TestNatsRPCServerProcessKick(t *testing.T) { s := helpers.GetTestNatsServer(t) ctrl := gomock.NewController(t) defer s.Shutdown() - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats cfg.Connect = fmt.Sprintf("nats://%s", s.Addr()) sv := getServer() mockSessionPool := sessionmocks.NewMockSessionPool(ctrl) - rpcServer, _ := NewNatsRPCServer(*cfg, sv, nil, nil, mockSessionPool) + rpcServer, _ := NewNatsRPCServer(cfg, sv, nil, nil, mockSessionPool) mockSessionPool.EXPECT().OnSessionBind(newFuncPtrMatcher(rpcServer.onSessionBind)) err := rpcServer.Init() @@ -488,14 +488,14 @@ func TestNatsRPCServerProcessKick(t *testing.T) { } func TestNatsRPCServerReportMetrics(t *testing.T) { - cfg := config.NewDefaultNatsRPCServerConfig() + cfg := config.NewDefaultPitayaConfig().Cluster.RPC.Server.Nats sv := getServer() ctrl := gomock.NewController(t) defer ctrl.Finish() mockMetricsReporter := metricsmocks.NewMockReporter(ctrl) mockMetricsReporters := []metrics.Reporter{mockMetricsReporter} - rpcServer, _ := NewNatsRPCServer(*cfg, sv, mockMetricsReporters, nil, nil) + rpcServer, _ := NewNatsRPCServer(cfg, sv, mockMetricsReporters, nil, nil) rpcServer.dropped = 100 rpcServer.messagesBufferSize = 100 rpcServer.pushBufferSize = 100 diff --git a/component_test.go b/component_test.go index 056f92e8..68c9dfa8 100644 --- a/component_test.go +++ b/component_test.go @@ -42,7 +42,7 @@ func (m *MyComp) Shutdown() { } func TestRegister(t *testing.T) { - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *config).(*App) b := &component.Base{} app.Register(b) @@ -51,7 +51,7 @@ func TestRegister(t *testing.T) { } func TestRegisterRemote(t *testing.T) { - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *config).(*App) before := app.remoteComp b := &component.Base{} @@ -61,7 +61,7 @@ func TestRegisterRemote(t *testing.T) { } func TestStartupComponents(t *testing.T) { - app := NewDefaultApp(true, "testtype", Standalone, map[string]string{}, *config.NewDefaultBuilderConfig()).(*App) + app := NewDefaultApp(true, "testtype", Standalone, map[string]string{}, *config.NewDefaultPitayaConfig()).(*App) app.Register(&MyComp{}) app.RegisterRemote(&MyComp{}) @@ -70,7 +70,7 @@ func TestStartupComponents(t *testing.T) { } func TestShutdownComponents(t *testing.T) { - app := NewDefaultApp(true, "testtype", Standalone, map[string]string{}, *config.NewDefaultBuilderConfig()).(*App) + app := NewDefaultApp(true, "testtype", Standalone, map[string]string{}, *config.NewDefaultPitayaConfig()).(*App) app.Register(&MyComp{}) app.RegisterRemote(&MyComp{}) diff --git a/config/config.go b/config/config.go index 3d0206c3..690737de 100644 --- a/config/config.go +++ b/config/config.go @@ -6,8 +6,13 @@ import ( "github.com/topfreegames/pitaya/v2/metrics/models" ) -// PitayaConfig provides configuration for a pitaya app +// PitayaConfig provides all the configuration for a pitaya app type PitayaConfig struct { + DefaultPipelines struct { + StructValidation struct { + Enabled bool `mapstructure:"enabled"` + } `mapstructure:"structvalidation"` + } `mapstructure:"defaultpipelines"` Heartbeat struct { Interval time.Duration `mapstructure:"interval"` } `mapstructure:"heartbeat"` @@ -38,17 +43,33 @@ type PitayaConfig struct { Period time.Duration `mapstructure:"period"` } `mapstructure:"drain"` } `mapstructure:"session"` - Metrics struct { - Period time.Duration `mapstructure:"period"` - } `mapstructure:"metrics"` + Acceptor struct { ProxyProtocol bool `mapstructure:"proxyprotocol"` } `mapstructure:"acceptor"` + Conn struct { + RateLimiting RateLimitingConfig `mapstructure:"rateLimiting"` + } `mapstructure:"conn"` + Metrics MetricsConfig `mapstructure:"metrics"` + Cluster ClusterConfig `mapstructure:"cluster"` + Groups GroupsConfig `mapstructure:"groups"` + Worker WorkerConfig `mapstructure:"worker"` } // NewDefaultPitayaConfig provides default configuration for Pitaya App func NewDefaultPitayaConfig() *PitayaConfig { return &PitayaConfig{ + DefaultPipelines: struct { + StructValidation struct { + Enabled bool `mapstructure:"enabled"` + } `mapstructure:"structvalidation"` + }{ + StructValidation: struct { + Enabled bool `mapstructure:"enabled"` + }{ + Enabled: false, + }, + }, Heartbeat: struct { Interval time.Duration `mapstructure:"interval"` }{ @@ -117,16 +138,20 @@ func NewDefaultPitayaConfig() *PitayaConfig { Period: time.Duration(5 * time.Second), }, }, - Metrics: struct { - Period time.Duration `mapstructure:"period"` - }{ - Period: time.Duration(15 * time.Second), - }, + Metrics: *newDefaultMetricsConfig(), + Cluster: *newDefaultClusterConfig(), + Groups: *newDefaultGroupsConfig(), + Worker: *newDefaultWorkerConfig(), Acceptor: struct { ProxyProtocol bool `mapstructure:"proxyprotocol"` }{ ProxyProtocol: false, }, + Conn: struct { + RateLimiting RateLimitingConfig `mapstructure:"rateLimiting"` + }{ + RateLimiting: *newDefaultRateLimitingConfig(), + }, } } @@ -139,73 +164,6 @@ func NewPitayaConfig(config *Config) *PitayaConfig { return conf } -// BuilderConfig provides configuration for Builder -type BuilderConfig struct { - Pitaya PitayaConfig - Metrics struct { - Prometheus struct { - Enabled bool `mapstructure:"enabled"` - } `mapstructure:"prometheus"` - Statsd struct { - Enabled bool `mapstructure:"enabled"` - } `mapstructure:"statsd"` - } `mapstructure:"metrics"` - DefaultPipelines struct { - StructValidation struct { - Enabled bool `mapstructure:"enabled"` - } `mapstructure:"structvalidation"` - } `mapstructure:"defaultpipelines"` -} - -// NewDefaultBuilderConfig provides default builder configuration -func NewDefaultBuilderConfig() *BuilderConfig { - return &BuilderConfig{ - Pitaya: *NewDefaultPitayaConfig(), - Metrics: struct { - Prometheus struct { - Enabled bool `mapstructure:"enabled"` - } `mapstructure:"prometheus"` - Statsd struct { - Enabled bool `mapstructure:"enabled"` - } `mapstructure:"statsd"` - }{ - Prometheus: struct { - Enabled bool `mapstructure:"enabled"` - }{ - Enabled: false, - }, - Statsd: struct { - Enabled bool `mapstructure:"enabled"` - }{ - Enabled: false, - }, - }, - DefaultPipelines: struct { - StructValidation struct { - Enabled bool `mapstructure:"enabled"` - } `mapstructure:"structvalidation"` - }{ - StructValidation: struct { - Enabled bool `mapstructure:"enabled"` - }{ - Enabled: false, - }, - }, - } -} - -// NewBuilderConfig reads from config to build builder configuration -func NewBuilderConfig(config *Config) *BuilderConfig { - conf := NewDefaultBuilderConfig() - if err := config.Unmarshal(&conf); err != nil { - panic(err) - } - if err := config.UnmarshalKey("pitaya", &conf); err != nil { - panic(err) - } - return conf -} - // GRPCClientConfig rpc client config struct type GRPCClientConfig struct { DialTimeout time.Duration `mapstructure:"dialtimeout"` @@ -213,8 +171,8 @@ type GRPCClientConfig struct { RequestTimeout time.Duration `mapstructure:"requesttimeout"` } -// NewDefaultGRPCClientConfig rpc client default config struct -func NewDefaultGRPCClientConfig() *GRPCClientConfig { +// newDefaultGRPCClientConfig rpc client default config struct +func newDefaultGRPCClientConfig() *GRPCClientConfig { return &GRPCClientConfig{ DialTimeout: time.Duration(5 * time.Second), LazyConnection: false, @@ -222,34 +180,18 @@ func NewDefaultGRPCClientConfig() *GRPCClientConfig { } } -// NewGRPCClientConfig reads from config to build GRPCCLientConfig -func NewGRPCClientConfig(config *Config) *GRPCClientConfig { - conf := NewDefaultGRPCClientConfig() - if err := config.UnmarshalKey("pitaya.cluster.rpc.client.grpc", &conf); err != nil { - panic(err) - } - return conf -} - // GRPCServerConfig provides configuration for GRPCServer type GRPCServerConfig struct { Port int `mapstructure:"port"` } -// NewDefaultGRPCServerConfig returns a default GRPCServerConfig -func NewDefaultGRPCServerConfig() *GRPCServerConfig { +// newDefaultGRPCServerConfig returns a default GRPCServerConfig +func newDefaultGRPCServerConfig() *GRPCServerConfig { return &GRPCServerConfig{ Port: 3434, } } -// NewGRPCServerConfig reads from config to build GRPCServerConfig -func NewGRPCServerConfig(config *Config) *GRPCServerConfig { - return &GRPCServerConfig{ - Port: config.GetInt("pitaya.cluster.rpc.server.grpc.port"), - } -} - // NatsRPCClientConfig provides nats client configuration type NatsRPCClientConfig struct { Connect string `mapstructure:"connect"` @@ -258,8 +200,8 @@ type NatsRPCClientConfig struct { ConnectionTimeout time.Duration `mapstructure:"connectiontimeout"` } -// NewDefaultNatsRPCClientConfig provides default nats client configuration -func NewDefaultNatsRPCClientConfig() *NatsRPCClientConfig { +// newDefaultNatsRPCClientConfig provides default nats client configuration +func newDefaultNatsRPCClientConfig() *NatsRPCClientConfig { return &NatsRPCClientConfig{ Connect: "nats://localhost:4222", MaxReconnectionRetries: 15, @@ -268,15 +210,6 @@ func NewDefaultNatsRPCClientConfig() *NatsRPCClientConfig { } } -// NewNatsRPCClientConfig reads from config to build nats client configuration -func NewNatsRPCClientConfig(config *Config) *NatsRPCClientConfig { - conf := NewDefaultNatsRPCClientConfig() - if err := config.UnmarshalKey("pitaya.cluster.rpc.client.nats", &conf); err != nil { - panic(err) - } - return conf -} - // NatsRPCServerConfig provides nats server configuration type NatsRPCServerConfig struct { Connect string `mapstructure:"connect"` @@ -289,8 +222,8 @@ type NatsRPCServerConfig struct { ConnectionTimeout time.Duration `mapstructure:"connectiontimeout"` } -// NewDefaultNatsRPCServerConfig provides default nats server configuration -func NewDefaultNatsRPCServerConfig() *NatsRPCServerConfig { +// newDefaultNatsRPCServerConfig provides default nats server configuration +func newDefaultNatsRPCServerConfig() *NatsRPCServerConfig { return &NatsRPCServerConfig{ Connect: "nats://localhost:4222", MaxReconnectionRetries: 15, @@ -306,36 +239,18 @@ func NewDefaultNatsRPCServerConfig() *NatsRPCServerConfig { } } -// NewNatsRPCServerConfig reads from config to build nats server configuration -func NewNatsRPCServerConfig(config *Config) *NatsRPCServerConfig { - conf := NewDefaultNatsRPCServerConfig() - if err := config.UnmarshalKey("pitaya.cluster.rpc.server.nats", &conf); err != nil { - panic(err) - } - return conf -} - // InfoRetrieverConfig provides InfoRetriever configuration type InfoRetrieverConfig struct { Region string `mapstructure:"region"` } -// NewDefaultInfoRetrieverConfig provides default configuration for InfoRetriever -func NewDefaultInfoRetrieverConfig() *InfoRetrieverConfig { +// newDefaultInfoRetrieverConfig provides default configuration for InfoRetriever +func newDefaultInfoRetrieverConfig() *InfoRetrieverConfig { return &InfoRetrieverConfig{ Region: "", } } -// NewInfoRetrieverConfig reads from config to build configuration for InfoRetriever -func NewInfoRetrieverConfig(c *Config) *InfoRetrieverConfig { - conf := NewDefaultInfoRetrieverConfig() - if err := c.UnmarshalKey("pitaya.cluster.info", &conf); err != nil { - panic(err) - } - return conf -} - // EtcdServiceDiscoveryConfig Etcd service discovery config type EtcdServiceDiscoveryConfig struct { Endpoints []string `mapstructure:"endpoints"` @@ -365,8 +280,8 @@ type EtcdServiceDiscoveryConfig struct { ServerTypesBlacklist []string `mapstructure:"servertypesblacklist"` } -// NewDefaultEtcdServiceDiscoveryConfig Etcd service discovery default config -func NewDefaultEtcdServiceDiscoveryConfig() *EtcdServiceDiscoveryConfig { +// newDefaultEtcdServiceDiscoveryConfig Etcd service discovery default config +func newDefaultEtcdServiceDiscoveryConfig() *EtcdServiceDiscoveryConfig { return &EtcdServiceDiscoveryConfig{ Endpoints: []string{"localhost:2379"}, User: "", @@ -410,15 +325,6 @@ func NewDefaultEtcdServiceDiscoveryConfig() *EtcdServiceDiscoveryConfig { } } -// NewEtcdServiceDiscoveryConfig Etcd service discovery config with default config paths -func NewEtcdServiceDiscoveryConfig(config *Config) *EtcdServiceDiscoveryConfig { - conf := NewDefaultEtcdServiceDiscoveryConfig() - if err := config.UnmarshalKey("pitaya.cluster.sd.etcd", &conf); err != nil { - panic(err) - } - return conf -} - // NewDefaultCustomMetricsSpec returns an empty *CustomMetricsSpec func NewDefaultCustomMetricsSpec() *models.CustomMetricsSpec { return &models.CustomMetricsSpec{ @@ -439,72 +345,111 @@ func NewCustomMetricsSpec(config *Config) *models.CustomMetricsSpec { return spec } -// PrometheusConfig provides configuration for PrometheusReporter -type PrometheusConfig struct { - Prometheus struct { - Port int `mapstructure:"port"` - AdditionalLabels map[string]string `mapstructure:"additionallabels"` - } `mapstructure:"prometheus"` - Game string `mapstructure:"game"` - ConstLabels map[string]string `mapstructure:"constlabels"` +// Metrics provides configuration for all metrics related configurations +type MetricsConfig struct { + Period time.Duration `mapstructure:"period"` + Game string `mapstructure:"game"` + AdditionalLabels map[string]string `mapstructure:"additionallabels"` + ConstLabels map[string]string `mapstructure:"constlabels"` + Prometheus *PrometheusConfig `mapstructure:"prometheus"` + Statsd *StatsdConfig `mapstructure:"statsd"` } -// NewDefaultPrometheusConfig provides default configuration for PrometheusReporter -func NewDefaultPrometheusConfig() *PrometheusConfig { - return &PrometheusConfig{ - Prometheus: struct { - Port int `mapstructure:"port"` - AdditionalLabels map[string]string `mapstructure:"additionallabels"` - }{ - Port: 9090, - AdditionalLabels: map[string]string{}, - }, - ConstLabels: map[string]string{}, +// newDefaultPrometheusConfig provides default configuration for PrometheusReporter +func newDefaultMetricsConfig() *MetricsConfig { + return &MetricsConfig{ + Period: time.Duration(15 * time.Second), + ConstLabels: map[string]string{}, + AdditionalLabels: map[string]string{}, + Prometheus: newDefaultPrometheusConfig(), + Statsd: newDefaultStatsdConfig(), } } -// NewPrometheusConfig reads from config to build configuration for PrometheusReporter -func NewPrometheusConfig(config *Config) *PrometheusConfig { - conf := NewDefaultPrometheusConfig() - if err := config.UnmarshalKey("pitaya.metrics", &conf); err != nil { - panic(err) +// PrometheusConfig provides configuration for PrometheusReporter +type PrometheusConfig struct { + Port int `mapstructure:"port"` + Enabled bool `mapstructure:"enabled"` +} + +// newDefaultPrometheusConfig provides default configuration for PrometheusReporter +func newDefaultPrometheusConfig() *PrometheusConfig { + return &PrometheusConfig{ + Port: 9090, + Enabled: false, } - return conf } // StatsdConfig provides configuration for statsd type StatsdConfig struct { - Statsd struct { - Host string `mapstructure:"host"` - Prefix string `mapstructure:"prefix"` - Rate float64 `mapstructure:"rate"` - } `mapstructure:"statsd"` - ConstLabels map[string]string `mapstructure:"consttags"` + Enabled bool `mapstructure:"enabled"` + Host string `mapstructure:"host"` + Prefix string `mapstructure:"prefix"` + Rate float64 `mapstructure:"rate"` } -// NewDefaultStatsdConfig provides default configuration for statsd -func NewDefaultStatsdConfig() *StatsdConfig { +// newDefaultStatsdConfig provides default configuration for statsd +func newDefaultStatsdConfig() *StatsdConfig { return &StatsdConfig{ - Statsd: struct { - Host string `mapstructure:"host"` - Prefix string `mapstructure:"prefix"` - Rate float64 `mapstructure:"rate"` + Enabled: false, + Host: "localhost:9125", + Prefix: "pitaya.", + Rate: 1, + } +} + +// newDefaultStatsdConfig provides default configuration for statsd +func newDefaultClusterConfig() *ClusterConfig { + return &ClusterConfig{ + Info: *newDefaultInfoRetrieverConfig(), + RPC: *newDefaultClusterRPCConfig(), + SD: *newDefaultClusterSDConfig(), + } +} + +type ClusterConfig struct { + Info InfoRetrieverConfig `mapstructure:"info"` + RPC ClusterRPCConfig `mapstructure:"rpc"` + SD ClusterSDConfig `mapstructure:"sd"` +} + +type ClusterRPCConfig struct { + Client struct { + Grpc GRPCClientConfig `mapstructure:"grpc"` + Nats NatsRPCClientConfig `mapstructure:"nats"` + } `mapstructure:"client"` + Server struct { + Grpc GRPCServerConfig `mapstructure:"grpc"` + Nats NatsRPCServerConfig `mapstructure:"nats"` + } `mapstructure:"server"` +} + +func newDefaultClusterRPCConfig() *ClusterRPCConfig { + return &ClusterRPCConfig{ + Client: struct { + Grpc GRPCClientConfig `mapstructure:"grpc"` + Nats NatsRPCClientConfig `mapstructure:"nats"` }{ - Host: "localhost:9125", - Prefix: "pitaya.", - Rate: 1, + Grpc: *newDefaultGRPCClientConfig(), + Nats: *newDefaultNatsRPCClientConfig(), + }, + Server: struct { + Grpc GRPCServerConfig `mapstructure:"grpc"` + Nats NatsRPCServerConfig `mapstructure:"nats"` + }{ + Grpc: *newDefaultGRPCServerConfig(), + Nats: *newDefaultNatsRPCServerConfig(), }, - ConstLabels: map[string]string{}, } + } -// NewStatsdConfig reads from config to build configuration for statsd -func NewStatsdConfig(config *Config) *StatsdConfig { - conf := NewDefaultStatsdConfig() - if err := config.UnmarshalKey("pitaya.metrics", &conf); err != nil { - panic(err) - } - return conf +type ClusterSDConfig struct { + Etcd EtcdServiceDiscoveryConfig `mapstructure:"etcd"` +} + +func newDefaultClusterSDConfig() *ClusterSDConfig { + return &ClusterSDConfig{Etcd: *newDefaultEtcdServiceDiscoveryConfig()} } // WorkerConfig provides worker configuration @@ -514,12 +459,13 @@ type WorkerConfig struct { Pool string `mapstructure:"pool"` Password string `mapstructure:"password"` } `mapstructure:"redis"` - Namespace string `mapstructure:"namespace"` - Concurrency int `mapstructure:"concurrency"` + Namespace string `mapstructure:"namespace"` + Concurrency int `mapstructure:"concurrency"` + Retry EnqueueOpts `mapstructure:"retry"` } -// NewDefaultWorkerConfig provides worker default configuration -func NewDefaultWorkerConfig() *WorkerConfig { +// newDefaultWorkerConfig provides worker default configuration +func newDefaultWorkerConfig() *WorkerConfig { return &WorkerConfig{ Redis: struct { ServerURL string `mapstructure:"serverurl"` @@ -530,18 +476,10 @@ func NewDefaultWorkerConfig() *WorkerConfig { Pool: "10", }, Concurrency: 1, + Retry: *newDefaultEnqueueOpts(), } } -// NewWorkerConfig provides worker configuration based on default string paths -func NewWorkerConfig(config *Config) *WorkerConfig { - conf := NewDefaultWorkerConfig() - if err := config.UnmarshalKey("pitaya.worker", &conf); err != nil { - panic(err) - } - return conf -} - // EnqueueOpts has retry options for worker type EnqueueOpts struct { Enabled bool `mapstructure:"enabled"` @@ -552,8 +490,8 @@ type EnqueueOpts struct { MaxRandom int `mapstructure:"maxrandom"` } -// NewDefaultEnqueueOpts provides default EnqueueOpts -func NewDefaultEnqueueOpts() *EnqueueOpts { +// newDefaultEnqueueOpts provides default EnqueueOpts +func newDefaultEnqueueOpts() *EnqueueOpts { return &EnqueueOpts{ Enabled: true, Max: 2, @@ -564,34 +502,16 @@ func NewDefaultEnqueueOpts() *EnqueueOpts { } } -// NewEnqueueOpts reads from config to build *EnqueueOpts -func NewEnqueueOpts(config *Config) *EnqueueOpts { - conf := NewDefaultEnqueueOpts() - if err := config.UnmarshalKey("pitaya.worker.retry", &conf); err != nil { - panic(err) - } - return conf -} - // MemoryGroupConfig provides configuration for MemoryGroup type MemoryGroupConfig struct { TickDuration time.Duration `mapstructure:"tickduration"` } -// NewDefaultMemoryGroupConfig returns a new, default group instance -func NewDefaultMemoryGroupConfig() *MemoryGroupConfig { +// newDefaultMemoryGroupConfig returns a new, default group instance +func newDefaultMemoryGroupConfig() *MemoryGroupConfig { return &MemoryGroupConfig{TickDuration: time.Duration(30 * time.Second)} } -// NewMemoryGroupConfig returns a new, default group instance -func NewMemoryGroupConfig(conf *Config) *MemoryGroupConfig { - c := NewDefaultMemoryGroupConfig() - if err := conf.UnmarshalKey("pitaya.groups.memory", &c); err != nil { - panic(err) - } - return c -} - // EtcdGroupServiceConfig provides ETCD configuration type EtcdGroupServiceConfig struct { DialTimeout time.Duration `mapstructure:"dialtimeout"` @@ -600,8 +520,8 @@ type EtcdGroupServiceConfig struct { TransactionTimeout time.Duration `mapstructure:"transactiontimeout"` } -// NewDefaultEtcdGroupServiceConfig provides default ETCD configuration -func NewDefaultEtcdGroupServiceConfig() *EtcdGroupServiceConfig { +// newDefaultEtcdGroupServiceConfig provides default ETCD configuration +func newDefaultEtcdGroupServiceConfig() *EtcdGroupServiceConfig { return &EtcdGroupServiceConfig{ DialTimeout: time.Duration(5 * time.Second), Endpoints: []string{"localhost:2379"}, @@ -611,14 +531,27 @@ func NewDefaultEtcdGroupServiceConfig() *EtcdGroupServiceConfig { } // NewEtcdGroupServiceConfig reads from config to build ETCD configuration -func NewEtcdGroupServiceConfig(config *Config) *EtcdGroupServiceConfig { - conf := NewDefaultEtcdGroupServiceConfig() +func newEtcdGroupServiceConfig(config *Config) *EtcdGroupServiceConfig { + conf := newDefaultEtcdGroupServiceConfig() if err := config.UnmarshalKey("pitaya.groups.etcd", &conf); err != nil { panic(err) } return conf } +type GroupsConfig struct { + Etcd EtcdGroupServiceConfig `mapstructure:"etcd"` + Memory MemoryGroupConfig `mapstructure:"memory"` +} + +// NewDefaultGroupConfig provides default ETCD configuration +func newDefaultGroupsConfig() *GroupsConfig { + return &GroupsConfig{ + Etcd: *newDefaultEtcdGroupServiceConfig(), + Memory: *newDefaultMemoryGroupConfig(), + } +} + // ETCDBindingConfig provides configuration for ETCDBindingStorage type ETCDBindingConfig struct { DialTimeout time.Duration `mapstructure:"dialtimeout"` @@ -637,7 +570,7 @@ func NewDefaultETCDBindingConfig() *ETCDBindingConfig { } } -// NewETCDBindingConfig reads from config to build ETCDBindingStorage configuration +// newETCDBindingConfig reads from config to build ETCDBindingStorage configuration func NewETCDBindingConfig(config *Config) *ETCDBindingConfig { conf := NewDefaultETCDBindingConfig() if err := config.UnmarshalKey("pitaya.modules.bindingstorage.etcd", &conf); err != nil { @@ -653,20 +586,11 @@ type RateLimitingConfig struct { ForceDisable bool `mapstructure:"forcedisable"` } -// NewDefaultRateLimitingConfig rate limits default config -func NewDefaultRateLimitingConfig() *RateLimitingConfig { +// newDefaultRateLimitingConfig rate limits default config +func newDefaultRateLimitingConfig() *RateLimitingConfig { return &RateLimitingConfig{ Limit: 20, Interval: time.Duration(time.Second), ForceDisable: false, } } - -// NewRateLimitingConfig reads from config to build rate limiting configuration -func NewRateLimitingConfig(config *Config) *RateLimitingConfig { - conf := NewDefaultRateLimitingConfig() - if err := config.UnmarshalKey("pitaya.conn.ratelimiting", &conf); err != nil { - panic(err) - } - return conf -} diff --git a/config/viper_config.go b/config/viper_config.go index 420a934c..24f8089d 100644 --- a/config/viper_config.go +++ b/config/viper_config.go @@ -21,11 +21,12 @@ package config import ( - "github.com/mitchellh/mapstructure" "reflect" "strings" "time" + "github.com/mitchellh/mapstructure" + "github.com/spf13/viper" ) @@ -52,21 +53,7 @@ func NewConfig(cfgs ...*viper.Viper) *Config { func (c *Config) fillDefaultValues() { customMetricsSpec := NewDefaultCustomMetricsSpec() - builderConfig := NewDefaultBuilderConfig() pitayaConfig := NewDefaultPitayaConfig() - prometheusConfig := NewDefaultPrometheusConfig() - statsdConfig := NewDefaultStatsdConfig() - etcdSDConfig := NewDefaultEtcdServiceDiscoveryConfig() - natsRPCServerConfig := NewDefaultNatsRPCServerConfig() - natsRPCClientConfig := NewDefaultNatsRPCClientConfig() - grpcRPCClientConfig := NewDefaultGRPCClientConfig() - grpcRPCServerConfig := NewDefaultGRPCServerConfig() - workerConfig := NewDefaultWorkerConfig() - enqueueOpts := NewDefaultEnqueueOpts() - groupServiceConfig := NewDefaultMemoryGroupConfig() - etcdGroupServiceConfig := NewDefaultEtcdGroupServiceConfig() - rateLimitingConfig := NewDefaultRateLimitingConfig() - infoRetrieverConfig := NewDefaultInfoRetrieverConfig() etcdBindingConfig := NewDefaultETCDBindingConfig() defaultsMap := map[string]interface{}{ @@ -74,78 +61,78 @@ func (c *Config) fillDefaultValues() { // the max buffer size that nats will accept, if this buffer overflows, messages will begin to be dropped "pitaya.buffer.handler.localprocess": pitayaConfig.Buffer.Handler.LocalProcess, "pitaya.buffer.handler.remoteprocess": pitayaConfig.Buffer.Handler.RemoteProcess, - "pitaya.cluster.info.region": infoRetrieverConfig.Region, - "pitaya.cluster.rpc.client.grpc.dialtimeout": grpcRPCClientConfig.DialTimeout, - "pitaya.cluster.rpc.client.grpc.requesttimeout": grpcRPCClientConfig.RequestTimeout, - "pitaya.cluster.rpc.client.grpc.lazyconnection": grpcRPCClientConfig.LazyConnection, - "pitaya.cluster.rpc.client.nats.connect": natsRPCClientConfig.Connect, - "pitaya.cluster.rpc.client.nats.connectiontimeout": natsRPCClientConfig.ConnectionTimeout, - "pitaya.cluster.rpc.client.nats.maxreconnectionretries": natsRPCClientConfig.MaxReconnectionRetries, - "pitaya.cluster.rpc.client.nats.requesttimeout": natsRPCClientConfig.RequestTimeout, - "pitaya.cluster.rpc.server.grpc.port": grpcRPCServerConfig.Port, - "pitaya.cluster.rpc.server.nats.connect": natsRPCServerConfig.Connect, - "pitaya.cluster.rpc.server.nats.connectiontimeout": natsRPCServerConfig.ConnectionTimeout, - "pitaya.cluster.rpc.server.nats.maxreconnectionretries": natsRPCServerConfig.MaxReconnectionRetries, - "pitaya.cluster.rpc.server.nats.services": natsRPCServerConfig.Services, - "pitaya.cluster.rpc.server.nats.buffer.messages": natsRPCServerConfig.Buffer.Messages, - "pitaya.cluster.rpc.server.nats.buffer.push": natsRPCServerConfig.Buffer.Push, - "pitaya.cluster.sd.etcd.dialtimeout": etcdSDConfig.DialTimeout, - "pitaya.cluster.sd.etcd.endpoints": etcdSDConfig.Endpoints, - "pitaya.cluster.sd.etcd.prefix": etcdSDConfig.Prefix, - "pitaya.cluster.sd.etcd.grantlease.maxretries": etcdSDConfig.GrantLease.MaxRetries, - "pitaya.cluster.sd.etcd.grantlease.retryinterval": etcdSDConfig.GrantLease.RetryInterval, - "pitaya.cluster.sd.etcd.grantlease.timeout": etcdSDConfig.GrantLease.Timeout, - "pitaya.cluster.sd.etcd.heartbeat.log": etcdSDConfig.Heartbeat.Log, - "pitaya.cluster.sd.etcd.heartbeat.ttl": etcdSDConfig.Heartbeat.TTL, - "pitaya.cluster.sd.etcd.revoke.timeout": etcdSDConfig.Revoke.Timeout, - "pitaya.cluster.sd.etcd.syncservers.interval": etcdSDConfig.SyncServers.Interval, - "pitaya.cluster.sd.etcd.syncservers.parallelism": etcdSDConfig.SyncServers.Parallelism, - "pitaya.cluster.sd.etcd.shutdown.delay": etcdSDConfig.Shutdown.Delay, - "pitaya.cluster.sd.etcd.servertypeblacklist": etcdSDConfig.ServerTypesBlacklist, + "pitaya.cluster.info.region": pitayaConfig.Cluster.Info.Region, + "pitaya.cluster.rpc.client.grpc.dialtimeout": pitayaConfig.Cluster.RPC.Client.Grpc.DialTimeout, + "pitaya.cluster.rpc.client.grpc.requesttimeout": pitayaConfig.Cluster.RPC.Client.Grpc.RequestTimeout, + "pitaya.cluster.rpc.client.grpc.lazyconnection": pitayaConfig.Cluster.RPC.Client.Grpc.LazyConnection, + "pitaya.cluster.rpc.client.nats.connect": pitayaConfig.Cluster.RPC.Client.Nats.Connect, + "pitaya.cluster.rpc.client.nats.connectiontimeout": pitayaConfig.Cluster.RPC.Client.Nats.ConnectionTimeout, + "pitaya.cluster.rpc.client.nats.maxreconnectionretries": pitayaConfig.Cluster.RPC.Client.Nats.MaxReconnectionRetries, + "pitaya.cluster.rpc.client.nats.requesttimeout": pitayaConfig.Cluster.RPC.Client.Nats.RequestTimeout, + "pitaya.cluster.rpc.server.grpc.port": pitayaConfig.Cluster.RPC.Server.Grpc.Port, + "pitaya.cluster.rpc.server.nats.connect": pitayaConfig.Cluster.RPC.Server.Nats.Connect, + "pitaya.cluster.rpc.server.nats.connectiontimeout": pitayaConfig.Cluster.RPC.Server.Nats.ConnectionTimeout, + "pitaya.cluster.rpc.server.nats.maxreconnectionretries": pitayaConfig.Cluster.RPC.Server.Nats.MaxReconnectionRetries, + "pitaya.cluster.rpc.server.nats.services": pitayaConfig.Cluster.RPC.Server.Nats.Services, + "pitaya.cluster.rpc.server.nats.buffer.messages": pitayaConfig.Cluster.RPC.Server.Nats.Buffer.Messages, + "pitaya.cluster.rpc.server.nats.buffer.push": pitayaConfig.Cluster.RPC.Server.Nats.Buffer.Push, + "pitaya.cluster.sd.etcd.dialtimeout": pitayaConfig.Cluster.SD.Etcd.DialTimeout, + "pitaya.cluster.sd.etcd.endpoints": pitayaConfig.Cluster.SD.Etcd.Endpoints, + "pitaya.cluster.sd.etcd.prefix": pitayaConfig.Cluster.SD.Etcd.Prefix, + "pitaya.cluster.sd.etcd.grantlease.maxretries": pitayaConfig.Cluster.SD.Etcd.GrantLease.MaxRetries, + "pitaya.cluster.sd.etcd.grantlease.retryinterval": pitayaConfig.Cluster.SD.Etcd.GrantLease.RetryInterval, + "pitaya.cluster.sd.etcd.grantlease.timeout": pitayaConfig.Cluster.SD.Etcd.GrantLease.Timeout, + "pitaya.cluster.sd.etcd.heartbeat.log": pitayaConfig.Cluster.SD.Etcd.Heartbeat.Log, + "pitaya.cluster.sd.etcd.heartbeat.ttl": pitayaConfig.Cluster.SD.Etcd.Heartbeat.TTL, + "pitaya.cluster.sd.etcd.revoke.timeout": pitayaConfig.Cluster.SD.Etcd.Revoke.Timeout, + "pitaya.cluster.sd.etcd.syncservers.interval": pitayaConfig.Cluster.SD.Etcd.SyncServers.Interval, + "pitaya.cluster.sd.etcd.syncservers.parallelism": pitayaConfig.Cluster.SD.Etcd.SyncServers.Parallelism, + "pitaya.cluster.sd.etcd.shutdown.delay": pitayaConfig.Cluster.SD.Etcd.Shutdown.Delay, + "pitaya.cluster.sd.etcd.servertypeblacklist": pitayaConfig.Cluster.SD.Etcd.ServerTypesBlacklist, // the sum of this config among all the frontend servers should always be less than // the sum of pitaya.buffer.cluster.rpc.server.nats.messages, for covering the worst case scenario // a single backend server should have the config pitaya.buffer.cluster.rpc.server.nats.messages bigger // than the sum of the config pitaya.concurrency.handler.dispatch among all frontend servers "pitaya.acceptor.proxyprotocol": pitayaConfig.Acceptor.ProxyProtocol, "pitaya.concurrency.handler.dispatch": pitayaConfig.Concurrency.Handler.Dispatch, - "pitaya.defaultpipelines.structvalidation.enabled": builderConfig.DefaultPipelines.StructValidation.Enabled, - "pitaya.groups.etcd.dialtimeout": etcdGroupServiceConfig.DialTimeout, - "pitaya.groups.etcd.endpoints": etcdGroupServiceConfig.Endpoints, - "pitaya.groups.etcd.prefix": etcdGroupServiceConfig.Prefix, - "pitaya.groups.etcd.transactiontimeout": etcdGroupServiceConfig.TransactionTimeout, - "pitaya.groups.memory.tickduration": groupServiceConfig.TickDuration, + "pitaya.defaultpipelines.structvalidation.enabled": pitayaConfig.DefaultPipelines.StructValidation.Enabled, + "pitaya.groups.etcd.dialtimeout": pitayaConfig.Groups.Etcd.DialTimeout, + "pitaya.groups.etcd.endpoints": pitayaConfig.Groups.Etcd.Endpoints, + "pitaya.groups.etcd.prefix": pitayaConfig.Groups.Etcd.Prefix, + "pitaya.groups.etcd.transactiontimeout": pitayaConfig.Groups.Etcd.TransactionTimeout, + "pitaya.groups.memory.tickduration": pitayaConfig.Groups.Memory.TickDuration, "pitaya.handler.messages.compression": pitayaConfig.Handler.Messages.Compression, "pitaya.heartbeat.interval": pitayaConfig.Heartbeat.Interval, - "pitaya.metrics.prometheus.additionalLabels": prometheusConfig.Prometheus.AdditionalLabels, - "pitaya.metrics.constLabels": prometheusConfig.ConstLabels, + "pitaya.metrics.additionalLabels": pitayaConfig.Metrics.AdditionalLabels, + "pitaya.metrics.constLabels": pitayaConfig.Metrics.ConstLabels, "pitaya.metrics.custom": customMetricsSpec, "pitaya.metrics.period": pitayaConfig.Metrics.Period, - "pitaya.metrics.prometheus.enabled": builderConfig.Metrics.Prometheus.Enabled, - "pitaya.metrics.prometheus.port": prometheusConfig.Prometheus.Port, - "pitaya.metrics.statsd.enabled": builderConfig.Metrics.Statsd.Enabled, - "pitaya.metrics.statsd.host": statsdConfig.Statsd.Host, - "pitaya.metrics.statsd.prefix": statsdConfig.Statsd.Prefix, - "pitaya.metrics.statsd.rate": statsdConfig.Statsd.Rate, + "pitaya.metrics.prometheus.enabled": pitayaConfig.Metrics.Prometheus.Enabled, + "pitaya.metrics.prometheus.port": pitayaConfig.Metrics.Prometheus.Port, + "pitaya.metrics.statsd.enabled": pitayaConfig.Metrics.Statsd.Enabled, + "pitaya.metrics.statsd.host": pitayaConfig.Metrics.Statsd.Host, + "pitaya.metrics.statsd.prefix": pitayaConfig.Metrics.Statsd.Prefix, + "pitaya.metrics.statsd.rate": pitayaConfig.Metrics.Statsd.Rate, "pitaya.modules.bindingstorage.etcd.dialtimeout": etcdBindingConfig.DialTimeout, "pitaya.modules.bindingstorage.etcd.endpoints": etcdBindingConfig.Endpoints, "pitaya.modules.bindingstorage.etcd.leasettl": etcdBindingConfig.LeaseTTL, "pitaya.modules.bindingstorage.etcd.prefix": etcdBindingConfig.Prefix, - "pitaya.conn.ratelimiting.limit": rateLimitingConfig.Limit, - "pitaya.conn.ratelimiting.interval": rateLimitingConfig.Interval, - "pitaya.conn.ratelimiting.forcedisable": rateLimitingConfig.ForceDisable, + "pitaya.conn.ratelimiting.limit": pitayaConfig.Conn.RateLimiting.Limit, + "pitaya.conn.ratelimiting.interval": pitayaConfig.Conn.RateLimiting.Interval, + "pitaya.conn.ratelimiting.forcedisable": pitayaConfig.Conn.RateLimiting.ForceDisable, "pitaya.session.unique": pitayaConfig.Session.Unique, "pitaya.session.drain.enabled": pitayaConfig.Session.Drain.Enabled, "pitaya.session.drain.timeout": pitayaConfig.Session.Drain.Timeout, "pitaya.session.drain.period": pitayaConfig.Session.Drain.Period, - "pitaya.worker.concurrency": workerConfig.Concurrency, - "pitaya.worker.redis.pool": workerConfig.Redis.Pool, - "pitaya.worker.redis.url": workerConfig.Redis.ServerURL, - "pitaya.worker.retry.enabled": enqueueOpts.Enabled, - "pitaya.worker.retry.exponential": enqueueOpts.Exponential, - "pitaya.worker.retry.max": enqueueOpts.Max, - "pitaya.worker.retry.maxDelay": enqueueOpts.MaxDelay, - "pitaya.worker.retry.maxRandom": enqueueOpts.MaxRandom, - "pitaya.worker.retry.minDelay": enqueueOpts.MinDelay, + "pitaya.worker.concurrency": pitayaConfig.Worker.Concurrency, + "pitaya.worker.redis.pool": pitayaConfig.Worker.Redis.Pool, + "pitaya.worker.redis.url": pitayaConfig.Worker.Redis.ServerURL, + "pitaya.worker.retry.enabled": pitayaConfig.Worker.Retry.Enabled, + "pitaya.worker.retry.exponential": pitayaConfig.Worker.Retry.Exponential, + "pitaya.worker.retry.max": pitayaConfig.Worker.Retry.Max, + "pitaya.worker.retry.maxDelay": pitayaConfig.Worker.Retry.MaxDelay, + "pitaya.worker.retry.maxRandom": pitayaConfig.Worker.Retry.MaxRandom, + "pitaya.worker.retry.minDelay": pitayaConfig.Worker.Retry.MinDelay, } for param := range defaultsMap { diff --git a/examples/demo/chat/main.go b/examples/demo/chat/main.go index f6b0ce90..1c9fd2fc 100644 --- a/examples/demo/chat/main.go +++ b/examples/demo/chat/main.go @@ -108,7 +108,7 @@ func main() { conf := configApp() builder := pitaya.NewDefaultBuilder(true, "chat", pitaya.Cluster, map[string]string{}, *conf) builder.AddAcceptor(acceptor.NewWSAcceptor(":3250")) - builder.Groups = groups.NewMemoryGroupService(*config.NewDefaultMemoryGroupConfig()) + builder.Groups = groups.NewMemoryGroupService(builder.Config.Groups.Memory) app = builder.Build() defer app.Shutdown() @@ -134,11 +134,11 @@ func main() { app.Start() } -func configApp() *config.BuilderConfig { - conf := config.NewDefaultBuilderConfig() - conf.Pitaya.Buffer.Handler.LocalProcess = 15 - conf.Pitaya.Heartbeat.Interval = time.Duration(15 * time.Second) - conf.Pitaya.Buffer.Agent.Messages = 32 - conf.Pitaya.Handler.Messages.Compression = false +func configApp() *config.PitayaConfig { + conf := config.NewDefaultPitayaConfig() + conf.Buffer.Handler.LocalProcess = 15 + conf.Heartbeat.Interval = time.Duration(15 * time.Second) + conf.Buffer.Agent.Messages = 32 + conf.Handler.Messages.Compression = false return conf } diff --git a/examples/demo/cluster/main.go b/examples/demo/cluster/main.go index 5c3caf8c..c9e5b43f 100644 --- a/examples/demo/cluster/main.go +++ b/examples/demo/cluster/main.go @@ -107,12 +107,12 @@ func main() { configureJaeger(viper.GetViper(), logrus.New()) } - builder := pitaya.NewDefaultBuilder(*isFrontend, *svType, pitaya.Cluster, map[string]string{}, *config.NewDefaultBuilderConfig()) + builder := pitaya.NewDefaultBuilder(*isFrontend, *svType, pitaya.Cluster, map[string]string{}, *config.NewDefaultPitayaConfig()) if *isFrontend { tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", *port)) builder.AddAcceptor(tcp) } - builder.Groups = groups.NewMemoryGroupService(*config.NewDefaultMemoryGroupConfig()) + builder.Groups = groups.NewMemoryGroupService(builder.Config.Groups.Memory) app = builder.Build() defer app.Shutdown() diff --git a/examples/demo/cluster_grpc/main.go b/examples/demo/cluster_grpc/main.go index a1e59f57..37b7260f 100644 --- a/examples/demo/cluster_grpc/main.go +++ b/examples/demo/cluster_grpc/main.go @@ -103,25 +103,25 @@ func main() { } func createApp(port int, isFrontend bool, svType string, meta map[string]string, rpcServerPort int) (pitaya.Pitaya, *modules.ETCDBindingStorage) { - builder := pitaya.NewDefaultBuilder(isFrontend, svType, pitaya.Cluster, meta, *config.NewDefaultBuilderConfig()) + builder := pitaya.NewDefaultBuilder(isFrontend, svType, pitaya.Cluster, meta, *config.NewDefaultPitayaConfig()) - grpcServerConfig := config.NewDefaultGRPCServerConfig() + grpcServerConfig := builder.Config.Cluster.RPC.Server.Grpc grpcServerConfig.Port = rpcServerPort - gs, err := cluster.NewGRPCServer(*grpcServerConfig, builder.Server, builder.MetricsReporters) + gs, err := cluster.NewGRPCServer(grpcServerConfig, builder.Server, builder.MetricsReporters) if err != nil { panic(err) } builder.RPCServer = gs - builder.Groups = groups.NewMemoryGroupService(*config.NewDefaultMemoryGroupConfig()) + builder.Groups = groups.NewMemoryGroupService(builder.Config.Groups.Memory) bs := modules.NewETCDBindingStorage(builder.Server, builder.SessionPool, *config.NewDefaultETCDBindingConfig()) gc, err := cluster.NewGRPCClient( - *config.NewDefaultGRPCClientConfig(), + builder.Config.Cluster.RPC.Client.Grpc, builder.Server, builder.MetricsReporters, bs, - cluster.NewInfoRetriever(*config.NewDefaultInfoRetrieverConfig()), + cluster.NewInfoRetriever(builder.Config.Cluster.Info), ) if err != nil { panic(err) diff --git a/examples/demo/pipeline/main.go b/examples/demo/pipeline/main.go index f86b4311..1505f6dc 100644 --- a/examples/demo/pipeline/main.go +++ b/examples/demo/pipeline/main.go @@ -97,7 +97,7 @@ func main() { port := 3251 metagameServer := NewMetagameMock() - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() config.DefaultPipelines.StructValidation.Enabled = true builder := pitaya.NewDefaultBuilder(*isFrontend, *svType, pitaya.Cluster, map[string]string{}, *config) diff --git a/examples/demo/rate_limiting/main.go b/examples/demo/rate_limiting/main.go index b4b112bb..d48bb2a8 100644 --- a/examples/demo/rate_limiting/main.go +++ b/examples/demo/rate_limiting/main.go @@ -25,12 +25,12 @@ func createAcceptor(port int, reporters []metrics.Reporter) acceptor.Acceptor { vConfig.Set("pitaya.conn.ratelimiting.interval", time.Minute) pConfig := config.NewConfig(vConfig) - rateLimitConfig := config.NewRateLimitingConfig(pConfig) + rateLimitConfig := config.NewPitayaConfig(pConfig).Conn.RateLimiting tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", port)) return acceptorwrapper.WithWrappers( tcp, - acceptorwrapper.NewRateLimitingWrapper(reporters, *rateLimitConfig)) + acceptorwrapper.NewRateLimitingWrapper(reporters, rateLimitConfig)) } var app pitaya.Pitaya @@ -41,7 +41,7 @@ func main() { flag.Parse() - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() builder := pitaya.NewDefaultBuilder(true, svType, pitaya.Cluster, map[string]string{}, *config) builder.AddAcceptor(createAcceptor(*port, builder.MetricsReporters)) diff --git a/examples/testing/main.go b/examples/testing/main.go index bd55ecfb..1ba5eb00 100644 --- a/examples/testing/main.go +++ b/examples/testing/main.go @@ -307,7 +307,7 @@ func createApp(serializer string, port int, grpc bool, isFrontend bool, svType s builder.AddAcceptor(tcp) } - builder.Groups = groups.NewMemoryGroupService(*config.NewDefaultMemoryGroupConfig()) + builder.Groups = groups.NewMemoryGroupService(builder.Config.Groups.Memory) if serializer == "json" { builder.Serializer = json.NewSerializer() @@ -317,9 +317,11 @@ func createApp(serializer string, port int, grpc bool, isFrontend bool, svType s panic("serializer should be either json or protobuf") } + pitayaConfig := config.NewPitayaConfig(conf) + var bs *modules.ETCDBindingStorage if grpc { - gs, err := cluster.NewGRPCServer(*config.NewGRPCServerConfig(conf), builder.Server, builder.MetricsReporters) + gs, err := cluster.NewGRPCServer(pitayaConfig.Cluster.RPC.Server.Grpc, builder.Server, builder.MetricsReporters) if err != nil { panic(err) } @@ -327,11 +329,11 @@ func createApp(serializer string, port int, grpc bool, isFrontend bool, svType s bs = modules.NewETCDBindingStorage(builder.Server, builder.SessionPool, *config.NewETCDBindingConfig(conf)) gc, err := cluster.NewGRPCClient( - *config.NewGRPCClientConfig(conf), + pitayaConfig.Cluster.RPC.Client.Grpc, builder.Server, builder.MetricsReporters, bs, - cluster.NewInfoRetriever(*config.NewInfoRetrieverConfig(conf)), + cluster.NewInfoRetriever(pitayaConfig.Cluster.Info), ) if err != nil { panic(err) diff --git a/group_test.go b/group_test.go index a0847318..e0a40fd6 100644 --- a/group_test.go +++ b/group_test.go @@ -32,7 +32,7 @@ import ( ) func createGroupTestApp() Pitaya { - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *config) return app } @@ -379,7 +379,7 @@ func TestBroadcast(t *testing.T) { mockSessionPool.EXPECT().GetSessionByUID(uid1).Return(s1).Times(1) mockSessionPool.EXPECT().GetSessionByUID(uid2).Return(s2).Times(1) - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() builder := NewDefaultBuilder(true, "testtype", Cluster, map[string]string{}, *config) builder.SessionPool = mockSessionPool app := builder.Build() diff --git a/groups/etcd_group_service_test.go b/groups/etcd_group_service_test.go index 9986d603..abe0bf03 100644 --- a/groups/etcd_group_service_test.go +++ b/groups/etcd_group_service_test.go @@ -30,7 +30,7 @@ import ( func setup(t *testing.T) (*integration.ClusterV3, GroupService) { cluster, cli := helpers.GetTestEtcd(t) - etcdGroupService, err := NewEtcdGroupService(*config.NewDefaultEtcdGroupServiceConfig(), cli) + etcdGroupService, err := NewEtcdGroupService(*&config.NewDefaultPitayaConfig().Groups.Etcd, cli) if err != nil { panic(err) } diff --git a/groups/memory_group_service_test.go b/groups/memory_group_service_test.go index a1a8aa41..fd47c43b 100644 --- a/groups/memory_group_service_test.go +++ b/groups/memory_group_service_test.go @@ -31,7 +31,7 @@ import ( var memoryGroupService *MemoryGroupService func TestMain(m *testing.M) { - mconfig := *config.NewDefaultMemoryGroupConfig() + mconfig := *&config.NewDefaultPitayaConfig().Groups.Memory mconfig.TickDuration = 10 * time.Millisecond memoryGroupService = NewMemoryGroupService(mconfig) exit := m.Run() diff --git a/kick_test.go b/kick_test.go index 895cdcf2..be44c9fd 100644 --- a/kick_test.go +++ b/kick_test.go @@ -56,7 +56,7 @@ func TestSendKickToUsersLocalSession(t *testing.T) { mockSessionPool.EXPECT().GetSessionByUID(table.uid1).Return(s1).Times(1) mockSessionPool.EXPECT().GetSessionByUID(table.uid2).Return(s2).Times(1) - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() builder := NewDefaultBuilder(true, "testtype", Cluster, map[string]string{}, *config) builder.SessionPool = mockSessionPool app := builder.Build() @@ -89,7 +89,7 @@ func TestSendKickToUsersFail(t *testing.T) { mockRPCClient := clustermocks.NewMockRPCClient(ctrl) mockRPCClient.EXPECT().SendKick(table.uid2, table.frontendType, &protos.KickMsg{UserId: table.uid2}).Return(table.err).Times(1) - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() builder := NewDefaultBuilder(true, "testtype", Cluster, map[string]string{}, *config) builder.SessionPool = mockSessionPool builder.RPCClient = mockRPCClient @@ -118,7 +118,7 @@ func TestSendKickToUsersRemoteSession(t *testing.T) { defer ctrl.Finish() mockRPCClient := clustermocks.NewMockRPCClient(ctrl) - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *config).(*App) app.rpcClient = mockRPCClient diff --git a/metrics/prometheus_reporter.go b/metrics/prometheus_reporter.go index 4b8c9d4a..1fe26a7c 100644 --- a/metrics/prometheus_reporter.go +++ b/metrics/prometheus_reporter.go @@ -22,6 +22,7 @@ package metrics import ( "fmt" + "github.com/topfreegames/pitaya/v2/logger" "net/http" @@ -305,7 +306,7 @@ func (p *PrometheusReporter) registerMetrics( // GetPrometheusReporter gets the prometheus reporter singleton func GetPrometheusReporter( serverType string, - config config.PrometheusConfig, + config config.MetricsConfig, metricsSpecs models.CustomMetricsSpec, ) (*PrometheusReporter, error) { return getPrometheusReporter(serverType, config, &metricsSpecs) @@ -313,7 +314,7 @@ func GetPrometheusReporter( func getPrometheusReporter( serverType string, - config config.PrometheusConfig, + config config.MetricsConfig, metricsSpecs *models.CustomMetricsSpec, ) (*PrometheusReporter, error) { once.Do(func() { @@ -325,7 +326,7 @@ func getPrometheusReporter( summaryReportersMap: make(map[string]*prometheus.SummaryVec), gaugeReportersMap: make(map[string]*prometheus.GaugeVec), } - prometheusReporter.registerMetrics(config.ConstLabels, config.Prometheus.AdditionalLabels, metricsSpecs) + prometheusReporter.registerMetrics(config.ConstLabels, config.AdditionalLabels, metricsSpecs) http.Handle("/metrics", promhttp.Handler()) go (func() { err := http.ListenAndServe(fmt.Sprintf(":%d", config.Prometheus.Port), nil) diff --git a/metrics/statsd_reporter.go b/metrics/statsd_reporter.go index 136984ac..a5f5d907 100644 --- a/metrics/statsd_reporter.go +++ b/metrics/statsd_reporter.go @@ -22,6 +22,7 @@ package metrics import ( "fmt" + "github.com/topfreegames/pitaya/v2/constants" "github.com/DataDog/datadog-go/statsd" @@ -47,7 +48,7 @@ type StatsdReporter struct { // NewStatsdReporter returns an instance of statsd reportar and an // error if something fails func NewStatsdReporter( - config config.StatsdConfig, + config config.MetricsConfig, serverType string, clientOrNil ...Client, ) (*StatsdReporter, error) { @@ -55,7 +56,7 @@ func NewStatsdReporter( } func newStatsdReporter( - config config.StatsdConfig, + config config.MetricsConfig, serverType string, clientOrNil ...Client) (*StatsdReporter, error) { sr := &StatsdReporter{ diff --git a/metrics/statsd_reporter_test.go b/metrics/statsd_reporter_test.go index 20545f2e..03dd2e71 100644 --- a/metrics/statsd_reporter_test.go +++ b/metrics/statsd_reporter_test.go @@ -38,8 +38,8 @@ func TestNewStatsdReporter(t *testing.T) { defer ctrl.Finish() mockClient := metricsmocks.NewMockClient(ctrl) - cfg := config.NewDefaultStatsdConfig() - sr, err := NewStatsdReporter(*cfg, "svType", mockClient) + cfg := config.NewDefaultPitayaConfig().Metrics + sr, err := NewStatsdReporter(cfg, "svType", mockClient) assert.NoError(t, err) assert.Equal(t, mockClient, sr.client) assert.Equal(t, float64(cfg.Statsd.Rate), sr.rate) @@ -51,11 +51,11 @@ func TestReportLatency(t *testing.T) { defer ctrl.Finish() mockClient := metricsmocks.NewMockClient(ctrl) - cfg := config.NewDefaultStatsdConfig() + cfg := config.NewDefaultPitayaConfig().Metrics cfg.ConstLabels = map[string]string{ "defaultTag": "value", } - sr, err := NewStatsdReporter(*cfg, "svType", mockClient) + sr, err := NewStatsdReporter(cfg, "svType", mockClient) assert.NoError(t, err) expectedDuration, err := time.ParseDuration("200ms") @@ -86,8 +86,8 @@ func TestReportLatencyError(t *testing.T) { defer ctrl.Finish() mockClient := metricsmocks.NewMockClient(ctrl) - cfg := config.NewDefaultStatsdConfig() - sr, err := NewStatsdReporter(*cfg, "svType", mockClient) + cfg := config.NewDefaultPitayaConfig().Metrics + sr, err := NewStatsdReporter(cfg, "svType", mockClient) assert.NoError(t, err) expectedError := errors.New("some error") @@ -102,11 +102,11 @@ func TestReportCount(t *testing.T) { defer ctrl.Finish() mockClient := metricsmocks.NewMockClient(ctrl) - cfg := config.NewDefaultStatsdConfig() + cfg := config.NewDefaultPitayaConfig().Metrics cfg.ConstLabels = map[string]string{ "defaultTag": "value", } - sr, err := NewStatsdReporter(*cfg, "svType", mockClient) + sr, err := NewStatsdReporter(cfg, "svType", mockClient) assert.NoError(t, err) expectedCount := 123 @@ -132,8 +132,8 @@ func TestReportCountError(t *testing.T) { defer ctrl.Finish() mockClient := metricsmocks.NewMockClient(ctrl) - cfg := config.NewDefaultStatsdConfig() - sr, err := NewStatsdReporter(*cfg, "svType", mockClient) + cfg := config.NewDefaultPitayaConfig().Metrics + sr, err := NewStatsdReporter(cfg, "svType", mockClient) assert.NoError(t, err) expectedError := errors.New("some error") @@ -148,11 +148,11 @@ func TestReportGauge(t *testing.T) { defer ctrl.Finish() mockClient := metricsmocks.NewMockClient(ctrl) - cfg := config.NewDefaultStatsdConfig() + cfg := config.NewDefaultPitayaConfig().Metrics cfg.ConstLabels = map[string]string{ "defaultTag": "value", } - sr, err := NewStatsdReporter(*cfg, "svType", mockClient) + sr, err := NewStatsdReporter(cfg, "svType", mockClient) assert.NoError(t, err) expectedValue := 123.1 @@ -178,11 +178,11 @@ func TestReportGaugeError(t *testing.T) { defer ctrl.Finish() mockClient := metricsmocks.NewMockClient(ctrl) - cfg := config.NewDefaultStatsdConfig() + cfg := config.NewDefaultPitayaConfig().Metrics cfg.ConstLabels = map[string]string{ "defaultTag": "value", } - sr, err := NewStatsdReporter(*cfg, "svType", mockClient) + sr, err := NewStatsdReporter(cfg, "svType", mockClient) assert.NoError(t, err) expectedError := errors.New("some error") diff --git a/module_test.go b/module_test.go index d51c38e0..64cb5680 100644 --- a/module_test.go +++ b/module_test.go @@ -51,7 +51,7 @@ func (m *MyMod) Shutdown() error { func TestRegisterModule(t *testing.T) { b := &MyMod{} - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *config).(*App) err := app.RegisterModule(b, "mod") @@ -68,7 +68,7 @@ func TestRegisterModule(t *testing.T) { func TestGetModule(t *testing.T) { b := &MyMod{} - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *config) app.RegisterModule(b, "mod") @@ -82,7 +82,7 @@ func TestGetModule(t *testing.T) { func TestStartupModules(t *testing.T) { modulesOrder = []string{} - app := NewDefaultApp(true, "testtype", Standalone, map[string]string{}, *config.NewDefaultBuilderConfig()).(*App) + app := NewDefaultApp(true, "testtype", Standalone, map[string]string{}, *config.NewDefaultPitayaConfig()).(*App) err := app.RegisterModule(&MyMod{name: "mod1"}, "mod1") assert.NoError(t, err) @@ -103,7 +103,7 @@ func TestStartupModules(t *testing.T) { func TestShutdownModules(t *testing.T) { modulesOrder = []string{} - app := NewDefaultApp(true, "testtype", Standalone, map[string]string{}, *config.NewDefaultBuilderConfig()).(*App) + app := NewDefaultApp(true, "testtype", Standalone, map[string]string{}, *config.NewDefaultPitayaConfig()).(*App) err := app.RegisterModule(&MyMod{name: "mod1"}, "mod1") assert.NoError(t, err) diff --git a/push_test.go b/push_test.go index 7a433fa9..d41eff5f 100644 --- a/push_test.go +++ b/push_test.go @@ -44,7 +44,7 @@ func TestSendPushToUsersFailsIfErrSerializing(t *testing.T) { defer ctrl.Finish() mockSerializer := serializemocks.NewMockSerializer(ctrl) - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() builder := NewDefaultBuilder(true, "testtype", Cluster, map[string]string{}, *config) builder.Serializer = mockSerializer app := builder.Build() @@ -94,7 +94,7 @@ func TestSendToUsersLocalSession(t *testing.T) { mockSessionPool.EXPECT().GetSessionByUID(uid1).Return(s1).Times(1) mockSessionPool.EXPECT().GetSessionByUID(uid2).Return(s2).Times(1) - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() builder := NewDefaultBuilder(true, "testtype", Standalone, map[string]string{}, *config) builder.SessionPool = mockSessionPool app := builder.Build().(*App) @@ -149,7 +149,7 @@ func TestSendToUsersRemoteSession(t *testing.T) { mockSessionPool.EXPECT().GetSessionByUID(uid1).Return(nil).Times(1) mockSessionPool.EXPECT().GetSessionByUID(uid2).Return(nil).Times(1) - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() builder := NewDefaultBuilder(true, "testtype", Cluster, map[string]string{}, *config) builder.SessionPool = mockSessionPool builder.RPCClient = mockRPCClient diff --git a/reporters.go b/reporters.go index 4572f684..20533f98 100644 --- a/reporters.go +++ b/reporters.go @@ -8,7 +8,7 @@ import ( ) // CreatePrometheusReporter create a Prometheus reporter instance -func CreatePrometheusReporter(serverType string, config config.PrometheusConfig, customSpecs models.CustomMetricsSpec) (*metrics.PrometheusReporter, error) { +func CreatePrometheusReporter(serverType string, config config.MetricsConfig, customSpecs models.CustomMetricsSpec) (*metrics.PrometheusReporter, error) { logger.Log.Infof("prometheus is enabled, configuring reporter on port %d", config.Prometheus.Port) prometheus, err := metrics.GetPrometheusReporter(serverType, config, customSpecs) if err != nil { @@ -18,7 +18,7 @@ func CreatePrometheusReporter(serverType string, config config.PrometheusConfig, } // CreateStatsdReporter create a Statsd reporter instance -func CreateStatsdReporter(serverType string, config config.StatsdConfig) (*metrics.StatsdReporter, error) { +func CreateStatsdReporter(serverType string, config config.MetricsConfig) (*metrics.StatsdReporter, error) { logger.Log.Infof( "statsd is enabled, configuring the metrics reporter with host: %s", config.Statsd.Host, diff --git a/rpc_test.go b/rpc_test.go index 60832578..aa23d050 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -44,14 +44,14 @@ import ( ) func TestDoSendRPCNotInitialized(t *testing.T) { - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Standalone, map[string]string{}, *config).(*App) err := app.doSendRPC(nil, "", "", nil, nil) assert.Equal(t, constants.ErrRPCServerNotInitialized, err) } func TestDoSendRPC(t *testing.T) { - config := config.NewDefaultBuilderConfig() + config := config.NewDefaultPitayaConfig() app := NewDefaultApp(true, "testtype", Cluster, map[string]string{}, *config).(*App) app.server.ID = "myserver" app.rpcServer = &cluster.NatsRPCServer{}