Skip to content

Commit

Permalink
Fix reconnecting to cluster (gocql#1555)
Browse files Browse the repository at this point in the history
After merging gocql#1369 it could happen
that after losing connection to a host, it is not reconnected.

That could happen in the following case:

1. We lose connection to a host.
2. We receive UP event for the host, the host is marked as UP even
   though we don't have active connections to it.
3. We don't reconnect to the host in reconnectDownedHosts because it's
   marked as UP already.

In PR 1369 we intended to change the host states so that a host is
marked as UP only after we have some connection to it. However, we also
removed the call to fill the pool on UP event in case the host already
existed.

This commit adds the call to fill the pool back to handleNodeUp.
We introduce handleNodeConnected for marking the host as up,
so that handleNodeUp handles only the UP event and the responsibilities
are clear.

Control connection is tried to be reconnected every second after
it is lost.
Before PR 1369, after establishing a control connection to a host,
we triggered pool refill for the same host.
PR 1369 removed this behaviour so that it is possible to wait
for the connection to be added to the pool in Session.init (and not
mark the host UP if we have only control connection.
This means that it could take up to cfg.ReconnectInterval after
estabilishing control connection to have some usable host in the pool
to service user queries after PR 1369.

To restore the old behavior, we add back triggering of pool refill after
control connection is connected. We can only do so after the session is
initialized, so that we don't the interfere with initialization code
in Session.init.

Related gocql#915

Co-Authored-By: Ivan Boyarkin <ivan.boyarkin@kiwi.com>
  • Loading branch information
martin-sucha and mrVanboy committed May 13, 2021
1 parent 558dfae commit 312a614
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 13 deletions.
4 changes: 2 additions & 2 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (pool *hostConnPool) fill() {
return
}
// notify the session that this node is connected
go pool.session.handleNodeUp(pool.host.ConnectAddress(), pool.port)
go pool.session.handleNodeConnected(pool.host)

// filled one
fillCount--
Expand All @@ -456,7 +456,7 @@ func (pool *hostConnPool) fill() {

if err == nil && startCount > 0 {
// notify the session that this node is connected again
go pool.session.handleNodeUp(pool.host.ConnectAddress(), pool.port)
go pool.session.handleNodeConnected(pool.host)
}
}()
}
Expand Down
9 changes: 9 additions & 0 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ func (c *controlConn) setupConn(conn *Conn) error {
}

c.conn.Store(ch)
if c.session.initialized() {
// We connected to control conn, so add the connect the host in pool as well.
// Notify session we can start trying to connect to the node.
// We can't start the fill before the session is initialized, otherwise the fill would interfere
// with the fill called by Session.init. Session.init needs to wait for its fill to finish and that
// would return immediately if we started the fill here.
// TODO(martin-sucha): Trigger pool refill for all hosts, like in reconnectDownedHosts?
go c.session.startPoolFill(host)
}
return nil
}

Expand Down
25 changes: 22 additions & 3 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,7 @@ func (s *Session) addNewNode(ip net.IP, port int) {
hostInfo = s.ring.addOrUpdate(hostInfo)

if !s.cfg.filterHost(hostInfo) {
// we let the pool call handleNodeUp to change the host state
s.pool.addHost(hostInfo)
s.policy.AddHost(hostInfo)
s.startPoolFill(hostInfo)
}

if s.control != nil && !s.cfg.IgnorePeerAddr {
Expand Down Expand Up @@ -264,6 +262,27 @@ func (s *Session) handleNodeUp(eventIp net.IP, eventPort int) {
return
}

if s.cfg.filterHost(host) {
return
}

if d := host.Version().nodeUpDelay(); d > 0 {
time.Sleep(d)
}
s.startPoolFill(host)
}

func (s *Session) startPoolFill(host *HostInfo) {
// we let the pool call handleNodeConnected to change the host state
s.pool.addHost(host)
s.policy.AddHost(host)
}

func (s *Session) handleNodeConnected(host *HostInfo) {
if gocqlDebug {
s.logger.Printf("gocql: Session.handleNodeConnected: %s:%d\n", host.ConnectAddress(), host.Port())
}

host.setState(NodeUp)

if !s.cfg.filterHost(host) {
Expand Down
3 changes: 1 addition & 2 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,7 @@ func (r *ringDescriber) refreshRing() error {
}

if host, ok := r.session.ring.addHostIfMissing(h); !ok {
r.session.pool.addHost(h)
r.session.policy.AddHost(h)
r.session.startPoolFill(h)
} else {
host.update(h)
}
Expand Down
28 changes: 22 additions & 6 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,13 @@ type Session struct {
ctx context.Context
cancel context.CancelFunc

closeMu sync.RWMutex
// sessionStateMu protects isClosed and isInitialized.
sessionStateMu sync.RWMutex
// isClosed is true once Session.Close is called.
isClosed bool
// isInitialized is true once Session.init succeeds.
// you can use initialized() to read the value.
isInitialized bool

logger StdLogger
}
Expand Down Expand Up @@ -317,6 +322,10 @@ func (s *Session) init() error {
s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: s.cfg.Keyspace})
}

s.sessionStateMu.Lock()
s.isInitialized = true
s.sessionStateMu.Unlock()

return nil
}

Expand Down Expand Up @@ -357,7 +366,7 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) {
if h.IsUp() {
continue
}
// we let the pool call handleNodeUp to change the host state
// we let the pool call handleNodeConnected to change the host state
s.pool.addHost(h)
}
case <-s.ctx.Done():
Expand Down Expand Up @@ -440,8 +449,8 @@ func (s *Session) Bind(stmt string, b func(q *QueryInfo) ([]interface{}, error))
// operation.
func (s *Session) Close() {

s.closeMu.Lock()
defer s.closeMu.Unlock()
s.sessionStateMu.Lock()
defer s.sessionStateMu.Unlock()
if s.isClosed {
return
}
Expand Down Expand Up @@ -469,12 +478,19 @@ func (s *Session) Close() {
}

func (s *Session) Closed() bool {
s.closeMu.RLock()
s.sessionStateMu.RLock()
closed := s.isClosed
s.closeMu.RUnlock()
s.sessionStateMu.RUnlock()
return closed
}

func (s *Session) initialized() bool {
s.sessionStateMu.RLock()
initialized := s.isInitialized
s.sessionStateMu.RUnlock()
return initialized
}

func (s *Session) executeQuery(qry *Query) (it *Iter) {
// fail fast
if s.Closed() {
Expand Down

0 comments on commit 312a614

Please sign in to comment.