From 9868cf7ce80b7ed749feaa6f5c3957b7cfe833ae Mon Sep 17 00:00:00 2001 From: Henrique Rodrigues Date: Mon, 21 May 2018 18:50:48 -0300 Subject: [PATCH] fix on name and error handling on connection closing --- cluster/nats_rpc_client.go | 20 ++++++++--------- cluster/nats_rpc_client_test.go | 32 +------------------------- cluster/nats_rpc_common.go | 6 +++++ cluster/nats_rpc_server.go | 34 ++++++++++++++-------------- config/config.go | 40 ++++++++++++++++----------------- 5 files changed, 54 insertions(+), 78 deletions(-) diff --git a/cluster/nats_rpc_client.go b/cluster/nats_rpc_client.go index 13c8d8df..abcaeb3a 100644 --- a/cluster/nats_rpc_client.go +++ b/cluster/nats_rpc_client.go @@ -43,14 +43,14 @@ import ( // NatsRPCClient struct type NatsRPCClient struct { - config *config.Config - conn *nats.Conn - connString string - maxReconnects int - reqTimeout time.Duration - running bool - server *Server - metricsReporter metrics.Reporter + config *config.Config + conn *nats.Conn + connString string + maxReconnectionRetries int + reqTimeout time.Duration + running bool + server *Server + metricsReporter metrics.Reporter } // NewNatsRPCClient ctor @@ -72,7 +72,7 @@ func (ns *NatsRPCClient) configure() error { if ns.connString == "" { return constants.ErrNoNatsConnectionString } - ns.maxReconnects = ns.config.GetInt("pitaya.cluster.rpc.client.nats.maxreconnects") + ns.maxReconnectionRetries = ns.config.GetInt("pitaya.cluster.rpc.client.nats.maxreconnectionretries") ns.reqTimeout = ns.config.GetDuration("pitaya.cluster.rpc.client.nats.requesttimeout") if ns.reqTimeout == 0 { return constants.ErrNatsNoRequestTimeout @@ -209,7 +209,7 @@ func (ns *NatsRPCClient) Call( // Init inits nats rpc client func (ns *NatsRPCClient) Init() error { ns.running = true - conn, err := setupNatsConn(ns.connString, nats.MaxReconnects(ns.maxReconnects)) + conn, err := setupNatsConn(ns.connString, nats.MaxReconnects(ns.maxReconnectionRetries)) if err != nil { return err } diff --git a/cluster/nats_rpc_client_test.go b/cluster/nats_rpc_client_test.go index ddc8397a..f58d9ec6 100644 --- a/cluster/nats_rpc_client_test.go +++ b/cluster/nats_rpc_client_test.go @@ -26,7 +26,6 @@ import ( "errors" "fmt" "os" - "os/exec" "testing" "time" @@ -365,36 +364,7 @@ func TestNatsRPCClientCall(t *testing.T) { assert.Equal(t, table.err, err) err = subs.Unsubscribe() assert.NoError(t, err) - // conn.Close() + conn.Close() }) } } - -func TestCloseNatsClient(t *testing.T) { - // TODO: is this too hacky? - if os.Getenv("STOP_CONN") == "true" { - s := helpers.GetTestNatsServer(t) - conn, err := setupNatsConn(fmt.Sprintf("nats://%s", s.Addr())) - assert.NoError(t, err) - conn.Close() - - helpers.ShouldEventuallyReturn(t, func() bool { - return conn.IsClosed() - }, true) - - return - } - - flag := "-test.run=" + t.Name() - cmd := exec.Command(os.Args[0], flag) - cmd.Env = append(os.Environ(), "STOP_CONN=true") - - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - - err := cmd.Wait() - if e, ok := err.(*exec.ExitError); !ok || e.Success() { - t.Fatalf("Process ran with err %v, want exit status 1", err) - } -} diff --git a/cluster/nats_rpc_common.go b/cluster/nats_rpc_common.go index e6b6489a..8d58cc8b 100644 --- a/cluster/nats_rpc_common.go +++ b/cluster/nats_rpc_common.go @@ -41,6 +41,12 @@ func setupNatsConn(connectString string, options ...nats.Option) (*nats.Conn, er logger.Log.Warnf("reconnected to nats %s!", nc.ConnectedUrl()) }), nats.ClosedHandler(func(nc *nats.Conn) { + err := nc.LastError() + if err == nil { + logger.Log.Warn("nats connection closed with no error.") + return + } + logger.Log.Fatalf("nats connection closed. reason: %q", nc.LastError()) }), ) diff --git a/cluster/nats_rpc_server.go b/cluster/nats_rpc_server.go index 451381c1..ce93a1e4 100644 --- a/cluster/nats_rpc_server.go +++ b/cluster/nats_rpc_server.go @@ -35,21 +35,21 @@ import ( // NatsRPCServer struct type NatsRPCServer struct { - connString string - maxReconnects int - server *Server - conn *nats.Conn - pushBufferSize int - messagesBufferSize int - config *config.Config - stopChan chan bool - subChan chan *nats.Msg // subChan is the channel used by the server to receive network messages addressed to itself - bindingsChan chan *nats.Msg // bindingsChan receives notify from other servers on every user bind to session - unhandledReqCh chan *protos.Request - userPushCh chan *protos.Push - sub *nats.Subscription - dropped int - metricsReporter metrics.Reporter + connString string + maxReconnectionRetries int + server *Server + conn *nats.Conn + pushBufferSize int + messagesBufferSize int + config *config.Config + stopChan chan bool + subChan chan *nats.Msg // subChan is the channel used by the server to receive network messages addressed to itself + bindingsChan chan *nats.Msg // bindingsChan receives notify from other servers on every user bind to session + unhandledReqCh chan *protos.Request + userPushCh chan *protos.Push + sub *nats.Subscription + dropped int + metricsReporter metrics.Reporter } // NewNatsRPCServer ctor @@ -74,7 +74,7 @@ func (ns *NatsRPCServer) configure() error { if ns.connString == "" { return constants.ErrNoNatsConnectionString } - ns.maxReconnects = ns.config.GetInt("pitaya.cluster.rpc.server.nats.maxreconnects") + ns.maxReconnectionRetries = ns.config.GetInt("pitaya.cluster.rpc.server.nats.maxreconnectionretries") ns.messagesBufferSize = ns.config.GetInt("pitaya.buffer.cluster.rpc.server.messages") if ns.messagesBufferSize == 0 { return constants.ErrNatsMessagesBufferSizeZero @@ -180,7 +180,7 @@ func (ns *NatsRPCServer) GetUserPushChannel() chan *protos.Push { func (ns *NatsRPCServer) Init() error { // TODO should we have concurrency here? it feels like we should go ns.handleMessages() - conn, err := setupNatsConn(ns.connString, nats.MaxReconnects(ns.maxReconnects)) + conn, err := setupNatsConn(ns.connString, nats.MaxReconnects(ns.maxReconnectionRetries)) if err != nil { return err } diff --git a/config/config.go b/config/config.go index d32c247d..472f6417 100644 --- a/config/config.go +++ b/config/config.go @@ -60,26 +60,26 @@ func (c *Config) fillDefaultValues() { // the sum of pitaya.buffer.cluster.rpc.server.messages, for covering the worst case scenario // a single backend server should have the config pitaya.buffer.cluster.rpc.server.messages bigger // than the sum of the config pitaya.concurrency.handler.dispatch among all frontend servers - "pitaya.concurrency.handler.dispatch": 25, - "pitaya.concurrency.remote.service": 30, - "pitaya.cluster.rpc.client.nats.connect": "nats://localhost:4222", - "pitaya.cluster.rpc.client.nats.requesttimeout": "5s", - "pitaya.cluster.rpc.client.nats.maxReconnects": 15, - "pitaya.cluster.rpc.server.nats.connect": "nats://localhost:4222", - "pitaya.cluster.rpc.server.nats.maxReconnects": 15, - "pitaya.cluster.sd.etcd.dialtimeout": "5s", - "pitaya.cluster.sd.etcd.endpoints": "localhost:2379", - "pitaya.cluster.sd.etcd.prefix": "pitaya/", - "pitaya.cluster.sd.etcd.heartbeat.ttl": "60s", - "pitaya.cluster.sd.etcd.heartbeat.log": true, - "pitaya.cluster.sd.etcd.syncservers.interval": "120s", - "pitaya.dataCompression": true, - "pitaya.heartbeat.interval": "30s", - "pitaya.session.unique": true, - "pitaya.metrics.statsd.enabled": false, - "pitaya.metrics.statsd.host": "localhost:9125", - "pitaya.metrics.statsd.prefix": "pitaya.", - "pitaya.metrics.statsd.rate": 1, + "pitaya.concurrency.handler.dispatch": 25, + "pitaya.concurrency.remote.service": 30, + "pitaya.cluster.rpc.client.nats.connect": "nats://localhost:4222", + "pitaya.cluster.rpc.client.nats.requesttimeout": "5s", + "pitaya.cluster.rpc.client.nats.maxreconnectionretries": 15, + "pitaya.cluster.rpc.server.nats.connect": "nats://localhost:4222", + "pitaya.cluster.rpc.server.nats.maxreconnectionretries": 15, + "pitaya.cluster.sd.etcd.dialtimeout": "5s", + "pitaya.cluster.sd.etcd.endpoints": "localhost:2379", + "pitaya.cluster.sd.etcd.prefix": "pitaya/", + "pitaya.cluster.sd.etcd.heartbeat.ttl": "60s", + "pitaya.cluster.sd.etcd.heartbeat.log": true, + "pitaya.cluster.sd.etcd.syncservers.interval": "120s", + "pitaya.dataCompression": true, + "pitaya.heartbeat.interval": "30s", + "pitaya.session.unique": true, + "pitaya.metrics.statsd.enabled": false, + "pitaya.metrics.statsd.host": "localhost:9125", + "pitaya.metrics.statsd.prefix": "pitaya.", + "pitaya.metrics.statsd.rate": 1, } for param := range defaultsMap {