Skip to content

Commit

Permalink
Merge b3beece into 64294ef
Browse files Browse the repository at this point in the history
  • Loading branch information
leohahn committed Nov 11, 2019
2 parents 64294ef + b3beece commit 0f2e32c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
23 changes: 17 additions & 6 deletions cluster/nats_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type NatsRPCClient struct {
config *config.Config
conn *nats.Conn
connString string
connectionTimeout time.Duration
maxReconnectionRetries int
reqTimeout time.Duration
running bool
Expand All @@ -62,11 +63,12 @@ func NewNatsRPCClient(
appDieChan chan bool,
) (*NatsRPCClient, error) {
ns := &NatsRPCClient{
config: config,
server: server,
running: false,
metricsReporters: metricsReporters,
appDieChan: appDieChan,
config: config,
server: server,
running: false,
metricsReporters: metricsReporters,
appDieChan: appDieChan,
connectionTimeout: nats.DefaultTimeout,
}
if err := ns.configure(); err != nil {
return nil, err
Expand All @@ -79,6 +81,9 @@ func (ns *NatsRPCClient) configure() error {
if ns.connString == "" {
return constants.ErrNoNatsConnectionString
}
if timeout := ns.config.GetDuration("pitaya.cluster.rpc.client.nats.connectiontimeout"); timeout != 0 {
ns.connectionTimeout = timeout
}
ns.maxReconnectionRetries = ns.config.GetInt("pitaya.cluster.rpc.client.nats.maxreconnectionretries")
ns.reqTimeout = ns.config.GetDuration("pitaya.cluster.rpc.client.nats.requesttimeout")
if ns.reqTimeout == 0 {
Expand Down Expand Up @@ -202,7 +207,13 @@ func (ns *NatsRPCClient) Call(
// Init inits nats rpc client
func (ns *NatsRPCClient) Init() error {
ns.running = true
conn, err := setupNatsConn(ns.connString, ns.appDieChan, nats.MaxReconnects(ns.maxReconnectionRetries))
logger.Log.Debugf("connecting to nats with timeout of %s", ns.connectionTimeout)
conn, err := setupNatsConn(
ns.connString,
ns.appDieChan,
nats.MaxReconnects(ns.maxReconnectionRetries),
nats.Timeout(ns.connectionTimeout),
)
if err != nil {
return err
}
Expand Down
29 changes: 21 additions & 8 deletions cluster/nats_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"fmt"
"math"
"time"

"github.com/golang/protobuf/proto"
nats "github.com/nats-io/nats.go"
Expand All @@ -41,6 +42,7 @@ import (
// NatsRPCServer struct
type NatsRPCServer struct {
connString string
connectionTimeout time.Duration
maxReconnectionRetries int
server *Server
conn *nats.Conn
Expand Down Expand Up @@ -68,13 +70,14 @@ func NewNatsRPCServer(
appDieChan chan bool,
) (*NatsRPCServer, error) {
ns := &NatsRPCServer{
config: config,
server: server,
stopChan: make(chan bool),
unhandledReqCh: make(chan *protos.Request),
dropped: 0,
metricsReporters: metricsReporters,
appDieChan: appDieChan,
config: config,
server: server,
stopChan: make(chan bool),
unhandledReqCh: make(chan *protos.Request),
dropped: 0,
metricsReporters: metricsReporters,
appDieChan: appDieChan,
connectionTimeout: nats.DefaultTimeout,
}
if err := ns.configure(); err != nil {
return nil, err
Expand All @@ -88,6 +91,9 @@ func (ns *NatsRPCServer) configure() error {
if ns.connString == "" {
return constants.ErrNoNatsConnectionString
}
if timeout := ns.config.GetDuration("pitaya.cluster.rpc.server.nats.connectiontimeout"); timeout != 0 {
ns.connectionTimeout = timeout
}
ns.maxReconnectionRetries = ns.config.GetInt("pitaya.cluster.rpc.server.nats.maxreconnectionretries")
ns.messagesBufferSize = ns.config.GetInt("pitaya.buffer.cluster.rpc.server.nats.messages")
if ns.messagesBufferSize == 0 {
Expand Down Expand Up @@ -309,7 +315,14 @@ func (ns *NatsRPCServer) processKick() {
func (ns *NatsRPCServer) Init() error {
// TODO should we have concurrency here? it feels like we should
go ns.handleMessages()
conn, err := setupNatsConn(ns.connString, ns.appDieChan, nats.MaxReconnects(ns.maxReconnectionRetries))

logger.Log.Debugf("connecting to nats with timeout of %s", ns.connectionTimeout)
conn, err := setupNatsConn(
ns.connString,
ns.appDieChan,
nats.MaxReconnects(ns.maxReconnectionRetries),
nats.Timeout(ns.connectionTimeout),
)
if err != nil {
return err
}
Expand Down

0 comments on commit 0f2e32c

Please sign in to comment.