diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java index b9ab2291376..9edc5d7c24d 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java @@ -1,6 +1,6 @@ /* * RHQ Management Platform - * Copyright (C) 2005-2014 Red Hat, Inc. + * Copyright (C) 2005-2015 Red Hat, Inc. * All rights reserved. * * This program is free software; you can redistribute it and/or modify @@ -52,6 +52,7 @@ import com.datastax.driver.core.ProtocolOptions; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryTimeoutException; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; import com.datastax.driver.core.policies.DefaultRetryPolicy; import com.datastax.driver.core.policies.LoadBalancingPolicy; @@ -124,6 +125,8 @@ public class StorageClientManager implements StorageClientManagerMBean{ private Metrics driverMetrics; + private SessionAliveChecker aliveChecker; + public void scheduleStorageSessionMaintenance() { // each time the webapp is reloaded, we don't want to create duplicate jobs Collection timers = timerService.getTimers(); @@ -187,7 +190,9 @@ public synchronized boolean init() { initMetricsServer(); JMXUtil.registerMBean(this, OBJECT_NAME); - initialized = true; + aliveChecker = new SessionAliveChecker(this); + aliveChecker.setName("StorageNode SessionAliveChecker"); + aliveChecker.start(); initialized = true; LOG.info("Storage client subsystem is now initialized"); @@ -222,7 +227,7 @@ public synchronized boolean init() { * 2) If the credentials are different then create a new session with the * new credentials and register it with the session manager. * - * @return if a new session was sucessfully created or no new session is required; otherwise + * @return if a new session was successfully created or no new session is required; otherwise */ public synchronized boolean refreshCredentialsAndSession() { if (!initialized) { @@ -236,26 +241,36 @@ public synchronized boolean refreshCredentialsAndSession() { if ((username != null && !username.equals(this.cachedStorageUsername)) || (password != null && !password.equals(this.cachedStoragePassword))) { + return refreshSession(); + } - Session wrappedSession; - try { - wrappedSession = createSession(); - } catch (NoHostAvailableException e) { - if (cluster != null) { - cluster.shutdown(); - } + return true; + } - LOG.warn("Storage client subsystem wasn't initialized because it wasn't possible to connect to the" - + " storage cluster. The RHQ server is set to MAINTENANCE mode. Please start the storage cluster" - + " as soon as possible.", e); - return false; + /** + * Recreates the session used to connect to the storage node. + * @return true if success, false otherwise. + */ + private synchronized boolean refreshSession() { + Session wrappedSession; + try { + wrappedSession = createSession(); + initialized = true; + } catch (NoHostAvailableException e) { + if (cluster != null) { + cluster.shutdown(); } - session.registerNewSession(wrappedSession); - metricsDAO.initPreparedStatements(); - return true; + LOG.warn("Storage client subsystem wasn't initialized because it wasn't possible to connect to the" + + " storage cluster. The RHQ server is set to MAINTENANCE mode. Please start the storage cluster" + + " as soon as possible.", e); + return false; } + session.registerNewSession(wrappedSession); + storageClusterMonitor = new StorageClusterMonitor(session); + session.addStorageStateListener(storageClusterMonitor); + metricsDAO.initPreparedStatements(); return true; } @@ -288,6 +303,8 @@ private void checkSchemaCompability(String username, String password, List 0) { + // Query succeeded, set fails to 0 + fails = 0; + } + } catch(QueryTimeoutException e) { + LOG.error("Storage node connection check timed out"); + } catch(NoHostAvailableException e) { + fails++; + if(fails >= ALLOWED_FAILS) { + LOG.error("Failed to contact the storage node for live check, recreating connection session"); + // We have lost the connection to the storage node, refresh and try again.. + storageClientManager.refreshSession(); + Thread.sleep(EXTENDED_SLEEP); // Sleep for a longer time to allow storage node to restart + } + } + } catch (InterruptedException e) { + // Alive should be false in the next iteration if shutdown was called + } catch(Exception e) { + LOG.error("AliveCheck thread run into an unexpected exception: " + e.getLocalizedMessage()); + } + } + } + + public void shutdown() { + this.alive = false; + this.interrupt(); + } + } } diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java index 526324acfb5..02d55c1fe67 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java @@ -1,7 +1,7 @@ /* * * RHQ Management Platform - * Copyright (C) 2005-2012 Red Hat, Inc. + * Copyright (C) 2005-2015 Red Hat, Inc. * All rights reserved. * * This program is free software; you can redistribute it and/or modify @@ -32,6 +32,8 @@ import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -73,6 +75,8 @@ public class MetricsDAO { private PreparedStatement deleteIndexEntry; private PreparedStatement deleteAggregate; + private PreparedStatement aliveCheck; + public MetricsDAO(StorageSession session, MetricsConfiguration configuration) { this.storageSession = session; this.configuration = configuration; @@ -148,6 +152,9 @@ public void initPreparedStatements() { "DELETE FROM " + MetricsTable.AGGREGATE + " " + "WHERE schedule_id = ? AND bucket = ? AND time = ?"); + aliveCheck = storageSession.prepare( + "SELECT columnfamily_name FROM System.schema_columnfamilies WHERE keyspace_name = ?"); + long endTime = System.currentTimeMillis(); log.info("Finished initializing prepared statements in " + (endTime - startTime) + " ms"); } @@ -256,4 +263,9 @@ public void deleteAggregate(AggregateNumericMetric metric) { storageSession.execute(statement); } + public ResultSet checkLiveness(String keyspace) throws QueryTimeoutException, NoHostAvailableException { + BoundStatement statement = aliveCheck.bind(keyspace); + return storageSession.executeDirect(statement); + } + } diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java index b193b0a5a4b..b49f4ade9c7 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java @@ -1,6 +1,6 @@ /* * RHQ Management Platform - * Copyright (C) 2005-2014 Red Hat, Inc. + * Copyright (C) 2005-2015 Red Hat, Inc. * All rights reserved. * * This program is free software; you can redistribute it and/or modify @@ -105,6 +105,9 @@ private double calculateRequestLimit() { rate += topologyDelta; } } + if(rate < topologyDelta) { + rate = topologyDelta; + } return rate; } @@ -166,6 +169,13 @@ public ResultSet execute(Query query) { } } + /** + * Skips rate throttling and exception handling. Do NOT use for normal operations. + */ + public ResultSet executeDirect(Query query) throws QueryTimeoutException, NoHostAvailableException { + return wrappedSession.execute(query); + } + public StorageResultSetFuture executeAsync(String query) { permits.acquire(); ResultSetFuture future = wrappedSession.executeAsync(query);