Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
Merge pull request #178 from burmanm/reconnect
Browse files Browse the repository at this point in the history
[BZ 1212627] Recreate storage node sessions if connections are down
  • Loading branch information
jsanda committed Jun 24, 2015
2 parents 91de3c4 + bb867e3 commit 67bbfa0
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 19 deletions.
@@ -1,6 +1,6 @@
/*
* RHQ Management Platform
* Copyright (C) 2005-2014 Red Hat, Inc.
* Copyright (C) 2005-2015 Red Hat, Inc.
* All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
Expand Down Expand Up @@ -52,6 +52,7 @@
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryTimeoutException;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
Expand Down Expand Up @@ -124,6 +125,8 @@ public class StorageClientManager implements StorageClientManagerMBean{

private Metrics driverMetrics;

private SessionAliveChecker aliveChecker;

public void scheduleStorageSessionMaintenance() {
// each time the webapp is reloaded, we don't want to create duplicate jobs
Collection<Timer> timers = timerService.getTimers();
Expand Down Expand Up @@ -187,7 +190,9 @@ public synchronized boolean init() {

initMetricsServer();
JMXUtil.registerMBean(this, OBJECT_NAME);
initialized = true;
aliveChecker = new SessionAliveChecker(this);
aliveChecker.setName("StorageNode SessionAliveChecker");
aliveChecker.start();

initialized = true;
LOG.info("Storage client subsystem is now initialized");
Expand Down Expand Up @@ -222,7 +227,7 @@ public synchronized boolean init() {
* 2) If the credentials are different then create a new session with the
* new credentials and register it with the session manager.
*
* @return <true> if a new session was sucessfully created or no new session is required; <false> otherwise
* @return <true> if a new session was successfully created or no new session is required; <false> otherwise
*/
public synchronized boolean refreshCredentialsAndSession() {
if (!initialized) {
Expand All @@ -236,26 +241,36 @@ public synchronized boolean refreshCredentialsAndSession() {

if ((username != null && !username.equals(this.cachedStorageUsername))
|| (password != null && !password.equals(this.cachedStoragePassword))) {
return refreshSession();
}

Session wrappedSession;
try {
wrappedSession = createSession();
} catch (NoHostAvailableException e) {
if (cluster != null) {
cluster.shutdown();
}
return true;
}

LOG.warn("Storage client subsystem wasn't initialized because it wasn't possible to connect to the"
+ " storage cluster. The RHQ server is set to MAINTENANCE mode. Please start the storage cluster"
+ " as soon as possible.", e);
return false;
/**
* Recreates the session used to connect to the storage node.
* @return true if success, false otherwise.
*/
private synchronized boolean refreshSession() {
Session wrappedSession;
try {
wrappedSession = createSession();
initialized = true;
} catch (NoHostAvailableException e) {
if (cluster != null) {
cluster.shutdown();
}

session.registerNewSession(wrappedSession);
metricsDAO.initPreparedStatements();
return true;
LOG.warn("Storage client subsystem wasn't initialized because it wasn't possible to connect to the"
+ " storage cluster. The RHQ server is set to MAINTENANCE mode. Please start the storage cluster"
+ " as soon as possible.", e);
return false;
}

session.registerNewSession(wrappedSession);
storageClusterMonitor = new StorageClusterMonitor(session);
session.addStorageStateListener(storageClusterMonitor);
metricsDAO.initPreparedStatements();
return true;
}

Expand Down Expand Up @@ -288,6 +303,8 @@ private void checkSchemaCompability(String username, String password, List<Stora
public synchronized void shutdown() {
LOG.info("Shutting down storage client subsystem");

aliveChecker.shutdown();

if (metricsServer != null) {
metricsServer.shutdown();
metricsServer = null;
Expand Down Expand Up @@ -631,4 +648,56 @@ public int getQueueAvailableCapacity() {
return metricsServer.getQueueAvailableCapacity();
}

/**
* A thread that checks for liveness of the given session.
*/
private static class SessionAliveChecker extends Thread {

private static long SLEEP_TIME = 5000L;
private static long EXTENDED_SLEEP = SLEEP_TIME * 5;
private static int ALLOWED_FAILS = 2;

private final StorageClientManager storageClientManager;
private boolean alive = true;
private int fails = 0;

public SessionAliveChecker(StorageClientManager manager) {
this.storageClientManager = manager;
}

@Override
public void run() {
while(alive) {
try {
Thread.sleep(SLEEP_TIME);
try {
storageClientManager.getMetricsDAO().checkLiveness(RHQ_KEYSPACE);
if(fails > 0) {
// Query succeeded, set fails to 0
fails = 0;
}
} catch(QueryTimeoutException e) {
LOG.error("Storage node connection check timed out");
} catch(NoHostAvailableException e) {
fails++;
if(fails >= ALLOWED_FAILS) {
LOG.error("Failed to contact the storage node for live check, recreating connection session");
// We have lost the connection to the storage node, refresh and try again..
storageClientManager.refreshSession();
Thread.sleep(EXTENDED_SLEEP); // Sleep for a longer time to allow storage node to restart
}
}
} catch (InterruptedException e) {
// Alive should be false in the next iteration if shutdown was called
} catch(Exception e) {
LOG.error("AliveCheck thread run into an unexpected exception: " + e.getLocalizedMessage());
}
}
}

public void shutdown() {
this.alive = false;
this.interrupt();
}
}
}
@@ -1,7 +1,7 @@
/*
*
* RHQ Management Platform
* Copyright (C) 2005-2012 Red Hat, Inc.
* Copyright (C) 2005-2015 Red Hat, Inc.
* All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
Expand Down Expand Up @@ -32,6 +32,8 @@
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryTimeoutException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -73,6 +75,8 @@ public class MetricsDAO {
private PreparedStatement deleteIndexEntry;
private PreparedStatement deleteAggregate;

private PreparedStatement aliveCheck;

public MetricsDAO(StorageSession session, MetricsConfiguration configuration) {
this.storageSession = session;
this.configuration = configuration;
Expand Down Expand Up @@ -148,6 +152,9 @@ public void initPreparedStatements() {
"DELETE FROM " + MetricsTable.AGGREGATE + " " +
"WHERE schedule_id = ? AND bucket = ? AND time = ?");

aliveCheck = storageSession.prepare(
"SELECT columnfamily_name FROM System.schema_columnfamilies WHERE keyspace_name = ?");

long endTime = System.currentTimeMillis();
log.info("Finished initializing prepared statements in " + (endTime - startTime) + " ms");
}
Expand Down Expand Up @@ -256,4 +263,9 @@ public void deleteAggregate(AggregateNumericMetric metric) {
storageSession.execute(statement);
}

public ResultSet checkLiveness(String keyspace) throws QueryTimeoutException, NoHostAvailableException {
BoundStatement statement = aliveCheck.bind(keyspace);
return storageSession.executeDirect(statement);
}

}
@@ -1,6 +1,6 @@
/*
* RHQ Management Platform
* Copyright (C) 2005-2014 Red Hat, Inc.
* Copyright (C) 2005-2015 Red Hat, Inc.
* All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
Expand Down Expand Up @@ -105,6 +105,9 @@ private double calculateRequestLimit() {
rate += topologyDelta;
}
}
if(rate < topologyDelta) {
rate = topologyDelta;
}
return rate;
}

Expand Down Expand Up @@ -166,6 +169,13 @@ public ResultSet execute(Query query) {
}
}

/**
* Skips rate throttling and exception handling. Do NOT use for normal operations.
*/
public ResultSet executeDirect(Query query) throws QueryTimeoutException, NoHostAvailableException {
return wrappedSession.execute(query);
}

public StorageResultSetFuture executeAsync(String query) {
permits.acquire();
ResultSetFuture future = wrappedSession.executeAsync(query);
Expand Down

0 comments on commit 67bbfa0

Please sign in to comment.