Skip to content

Commit

Permalink
fix on name and error handling on connection closing
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed May 21, 2018
1 parent a37c134 commit 9868cf7
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 78 deletions.
20 changes: 10 additions & 10 deletions cluster/nats_rpc_client.go
Expand Up @@ -43,14 +43,14 @@ import (

// NatsRPCClient struct
type NatsRPCClient struct {
config *config.Config
conn *nats.Conn
connString string
maxReconnects int
reqTimeout time.Duration
running bool
server *Server
metricsReporter metrics.Reporter
config *config.Config
conn *nats.Conn
connString string
maxReconnectionRetries int
reqTimeout time.Duration
running bool
server *Server
metricsReporter metrics.Reporter
}

// NewNatsRPCClient ctor
Expand All @@ -72,7 +72,7 @@ func (ns *NatsRPCClient) configure() error {
if ns.connString == "" {
return constants.ErrNoNatsConnectionString
}
ns.maxReconnects = ns.config.GetInt("pitaya.cluster.rpc.client.nats.maxreconnects")
ns.maxReconnectionRetries = ns.config.GetInt("pitaya.cluster.rpc.client.nats.maxreconnectionretries")
ns.reqTimeout = ns.config.GetDuration("pitaya.cluster.rpc.client.nats.requesttimeout")
if ns.reqTimeout == 0 {
return constants.ErrNatsNoRequestTimeout
Expand Down Expand Up @@ -209,7 +209,7 @@ func (ns *NatsRPCClient) Call(
// Init inits nats rpc client
func (ns *NatsRPCClient) Init() error {
ns.running = true
conn, err := setupNatsConn(ns.connString, nats.MaxReconnects(ns.maxReconnects))
conn, err := setupNatsConn(ns.connString, nats.MaxReconnects(ns.maxReconnectionRetries))
if err != nil {
return err
}
Expand Down
32 changes: 1 addition & 31 deletions cluster/nats_rpc_client_test.go
Expand Up @@ -26,7 +26,6 @@ import (
"errors"
"fmt"
"os"
"os/exec"
"testing"
"time"

Expand Down Expand Up @@ -365,36 +364,7 @@ func TestNatsRPCClientCall(t *testing.T) {
assert.Equal(t, table.err, err)
err = subs.Unsubscribe()
assert.NoError(t, err)
// conn.Close()
conn.Close()
})
}
}

func TestCloseNatsClient(t *testing.T) {
// TODO: is this too hacky?
if os.Getenv("STOP_CONN") == "true" {
s := helpers.GetTestNatsServer(t)
conn, err := setupNatsConn(fmt.Sprintf("nats://%s", s.Addr()))
assert.NoError(t, err)
conn.Close()

helpers.ShouldEventuallyReturn(t, func() bool {
return conn.IsClosed()
}, true)

return
}

flag := "-test.run=" + t.Name()
cmd := exec.Command(os.Args[0], flag)
cmd.Env = append(os.Environ(), "STOP_CONN=true")

if err := cmd.Start(); err != nil {
t.Fatal(err)
}

err := cmd.Wait()
if e, ok := err.(*exec.ExitError); !ok || e.Success() {
t.Fatalf("Process ran with err %v, want exit status 1", err)
}
}
6 changes: 6 additions & 0 deletions cluster/nats_rpc_common.go
Expand Up @@ -41,6 +41,12 @@ func setupNatsConn(connectString string, options ...nats.Option) (*nats.Conn, er
logger.Log.Warnf("reconnected to nats %s!", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
err := nc.LastError()
if err == nil {
logger.Log.Warn("nats connection closed with no error.")
return
}

logger.Log.Fatalf("nats connection closed. reason: %q", nc.LastError())
}),
)
Expand Down
34 changes: 17 additions & 17 deletions cluster/nats_rpc_server.go
Expand Up @@ -35,21 +35,21 @@ import (

// NatsRPCServer struct
type NatsRPCServer struct {
connString string
maxReconnects int
server *Server
conn *nats.Conn
pushBufferSize int
messagesBufferSize int
config *config.Config
stopChan chan bool
subChan chan *nats.Msg // subChan is the channel used by the server to receive network messages addressed to itself
bindingsChan chan *nats.Msg // bindingsChan receives notify from other servers on every user bind to session
unhandledReqCh chan *protos.Request
userPushCh chan *protos.Push
sub *nats.Subscription
dropped int
metricsReporter metrics.Reporter
connString string
maxReconnectionRetries int
server *Server
conn *nats.Conn
pushBufferSize int
messagesBufferSize int
config *config.Config
stopChan chan bool
subChan chan *nats.Msg // subChan is the channel used by the server to receive network messages addressed to itself
bindingsChan chan *nats.Msg // bindingsChan receives notify from other servers on every user bind to session
unhandledReqCh chan *protos.Request
userPushCh chan *protos.Push
sub *nats.Subscription
dropped int
metricsReporter metrics.Reporter
}

// NewNatsRPCServer ctor
Expand All @@ -74,7 +74,7 @@ func (ns *NatsRPCServer) configure() error {
if ns.connString == "" {
return constants.ErrNoNatsConnectionString
}
ns.maxReconnects = ns.config.GetInt("pitaya.cluster.rpc.server.nats.maxreconnects")
ns.maxReconnectionRetries = ns.config.GetInt("pitaya.cluster.rpc.server.nats.maxreconnectionretries")
ns.messagesBufferSize = ns.config.GetInt("pitaya.buffer.cluster.rpc.server.messages")
if ns.messagesBufferSize == 0 {
return constants.ErrNatsMessagesBufferSizeZero
Expand Down Expand Up @@ -180,7 +180,7 @@ func (ns *NatsRPCServer) GetUserPushChannel() chan *protos.Push {
func (ns *NatsRPCServer) Init() error {
// TODO should we have concurrency here? it feels like we should
go ns.handleMessages()
conn, err := setupNatsConn(ns.connString, nats.MaxReconnects(ns.maxReconnects))
conn, err := setupNatsConn(ns.connString, nats.MaxReconnects(ns.maxReconnectionRetries))
if err != nil {
return err
}
Expand Down
40 changes: 20 additions & 20 deletions config/config.go
Expand Up @@ -60,26 +60,26 @@ func (c *Config) fillDefaultValues() {
// the sum of pitaya.buffer.cluster.rpc.server.messages, for covering the worst case scenario
// a single backend server should have the config pitaya.buffer.cluster.rpc.server.messages bigger
// than the sum of the config pitaya.concurrency.handler.dispatch among all frontend servers
"pitaya.concurrency.handler.dispatch": 25,
"pitaya.concurrency.remote.service": 30,
"pitaya.cluster.rpc.client.nats.connect": "nats://localhost:4222",
"pitaya.cluster.rpc.client.nats.requesttimeout": "5s",
"pitaya.cluster.rpc.client.nats.maxReconnects": 15,
"pitaya.cluster.rpc.server.nats.connect": "nats://localhost:4222",
"pitaya.cluster.rpc.server.nats.maxReconnects": 15,
"pitaya.cluster.sd.etcd.dialtimeout": "5s",
"pitaya.cluster.sd.etcd.endpoints": "localhost:2379",
"pitaya.cluster.sd.etcd.prefix": "pitaya/",
"pitaya.cluster.sd.etcd.heartbeat.ttl": "60s",
"pitaya.cluster.sd.etcd.heartbeat.log": true,
"pitaya.cluster.sd.etcd.syncservers.interval": "120s",
"pitaya.dataCompression": true,
"pitaya.heartbeat.interval": "30s",
"pitaya.session.unique": true,
"pitaya.metrics.statsd.enabled": false,
"pitaya.metrics.statsd.host": "localhost:9125",
"pitaya.metrics.statsd.prefix": "pitaya.",
"pitaya.metrics.statsd.rate": 1,
"pitaya.concurrency.handler.dispatch": 25,
"pitaya.concurrency.remote.service": 30,
"pitaya.cluster.rpc.client.nats.connect": "nats://localhost:4222",
"pitaya.cluster.rpc.client.nats.requesttimeout": "5s",
"pitaya.cluster.rpc.client.nats.maxreconnectionretries": 15,
"pitaya.cluster.rpc.server.nats.connect": "nats://localhost:4222",
"pitaya.cluster.rpc.server.nats.maxreconnectionretries": 15,
"pitaya.cluster.sd.etcd.dialtimeout": "5s",
"pitaya.cluster.sd.etcd.endpoints": "localhost:2379",
"pitaya.cluster.sd.etcd.prefix": "pitaya/",
"pitaya.cluster.sd.etcd.heartbeat.ttl": "60s",
"pitaya.cluster.sd.etcd.heartbeat.log": true,
"pitaya.cluster.sd.etcd.syncservers.interval": "120s",
"pitaya.dataCompression": true,
"pitaya.heartbeat.interval": "30s",
"pitaya.session.unique": true,
"pitaya.metrics.statsd.enabled": false,
"pitaya.metrics.statsd.host": "localhost:9125",
"pitaya.metrics.statsd.prefix": "pitaya.",
"pitaya.metrics.statsd.rate": 1,
}

for param := range defaultsMap {
Expand Down

0 comments on commit 9868cf7

Please sign in to comment.