From aa63682195ea1fc2a7ae06f11023b6cc05286c19 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Tue, 23 Jun 2015 16:37:53 +0300 Subject: [PATCH 1/2] [BZ 1212627] Check storage node connection aliveness every 4s and recreate session if check failed twice in a row. --- .../server/storage/StorageClientManager.java | 100 +++++++++++++++--- .../org/rhq/server/metrics/MetricsDAO.java | 14 ++- .../rhq/server/metrics/StorageSession.java | 12 ++- 3 files changed, 107 insertions(+), 19 deletions(-) diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java index b9ab2291376..ce07ee00433 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java @@ -1,6 +1,6 @@ /* * RHQ Management Platform - * Copyright (C) 2005-2014 Red Hat, Inc. + * Copyright (C) 2005-2015 Red Hat, Inc. * All rights reserved. * * This program is free software; you can redistribute it and/or modify @@ -52,6 +52,7 @@ import com.datastax.driver.core.ProtocolOptions; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryTimeoutException; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; import com.datastax.driver.core.policies.DefaultRetryPolicy; import com.datastax.driver.core.policies.LoadBalancingPolicy; @@ -124,6 +125,8 @@ public class StorageClientManager implements StorageClientManagerMBean{ private Metrics driverMetrics; + private SessionAliveChecker aliveChecker; + public void scheduleStorageSessionMaintenance() { // each time the webapp is reloaded, we don't want to create duplicate jobs Collection timers = timerService.getTimers(); @@ -187,7 +190,8 @@ public synchronized boolean init() { initMetricsServer(); JMXUtil.registerMBean(this, OBJECT_NAME); - initialized = true; + aliveChecker = new SessionAliveChecker(this); + aliveChecker.start(); initialized = true; LOG.info("Storage client subsystem is now initialized"); @@ -222,7 +226,7 @@ public synchronized boolean init() { * 2) If the credentials are different then create a new session with the * new credentials and register it with the session manager. * - * @return if a new session was sucessfully created or no new session is required; otherwise + * @return if a new session was successfully created or no new session is required; otherwise */ public synchronized boolean refreshCredentialsAndSession() { if (!initialized) { @@ -236,26 +240,36 @@ public synchronized boolean refreshCredentialsAndSession() { if ((username != null && !username.equals(this.cachedStorageUsername)) || (password != null && !password.equals(this.cachedStoragePassword))) { + return refreshSession(); + } - Session wrappedSession; - try { - wrappedSession = createSession(); - } catch (NoHostAvailableException e) { - if (cluster != null) { - cluster.shutdown(); - } + return true; + } - LOG.warn("Storage client subsystem wasn't initialized because it wasn't possible to connect to the" - + " storage cluster. The RHQ server is set to MAINTENANCE mode. Please start the storage cluster" - + " as soon as possible.", e); - return false; + /** + * Recreates the session used to connect to the storage node. + * @return true if success, false otherwise. + */ + private synchronized boolean refreshSession() { + Session wrappedSession; + try { + wrappedSession = createSession(); + initialized = true; + } catch (NoHostAvailableException e) { + if (cluster != null) { + cluster.shutdown(); } - session.registerNewSession(wrappedSession); - metricsDAO.initPreparedStatements(); - return true; + LOG.warn("Storage client subsystem wasn't initialized because it wasn't possible to connect to the" + + " storage cluster. The RHQ server is set to MAINTENANCE mode. Please start the storage cluster" + + " as soon as possible.", e); + return false; } + session.registerNewSession(wrappedSession); + storageClusterMonitor = new StorageClusterMonitor(session); + session.addStorageStateListener(storageClusterMonitor); + metricsDAO.initPreparedStatements(); return true; } @@ -288,6 +302,8 @@ private void checkSchemaCompability(String username, String password, List 0) { + // Query succeeded, set fails to 0 + fails = 0; + } + } catch(QueryTimeoutException e) { + LOG.error("Storage node connection check timed out"); + } catch(NoHostAvailableException e) { + fails++; + if(fails >= ALLOWED_FAILS) { + LOG.error("Failed to contact the storage node for live check, recreating connection session"); + // We have lost the connection to the storage node, refresh and try again.. + storageClientManager.refreshSession(); + Thread.sleep(EXTENDED_SLEEP); // Sleep for a longer time to allow storage node to restart + } + } + } catch (InterruptedException e) { + // Alive should be false in the next iteration if shutdown was called + } + } + } + + public void shutdown() { + this.alive = false; + this.interrupt(); + } + } } diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java index 526324acfb5..02d55c1fe67 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java @@ -1,7 +1,7 @@ /* * * RHQ Management Platform - * Copyright (C) 2005-2012 Red Hat, Inc. + * Copyright (C) 2005-2015 Red Hat, Inc. * All rights reserved. * * This program is free software; you can redistribute it and/or modify @@ -32,6 +32,8 @@ import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -73,6 +75,8 @@ public class MetricsDAO { private PreparedStatement deleteIndexEntry; private PreparedStatement deleteAggregate; + private PreparedStatement aliveCheck; + public MetricsDAO(StorageSession session, MetricsConfiguration configuration) { this.storageSession = session; this.configuration = configuration; @@ -148,6 +152,9 @@ public void initPreparedStatements() { "DELETE FROM " + MetricsTable.AGGREGATE + " " + "WHERE schedule_id = ? AND bucket = ? AND time = ?"); + aliveCheck = storageSession.prepare( + "SELECT columnfamily_name FROM System.schema_columnfamilies WHERE keyspace_name = ?"); + long endTime = System.currentTimeMillis(); log.info("Finished initializing prepared statements in " + (endTime - startTime) + " ms"); } @@ -256,4 +263,9 @@ public void deleteAggregate(AggregateNumericMetric metric) { storageSession.execute(statement); } + public ResultSet checkLiveness(String keyspace) throws QueryTimeoutException, NoHostAvailableException { + BoundStatement statement = aliveCheck.bind(keyspace); + return storageSession.executeDirect(statement); + } + } diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java index b193b0a5a4b..b49f4ade9c7 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java @@ -1,6 +1,6 @@ /* * RHQ Management Platform - * Copyright (C) 2005-2014 Red Hat, Inc. + * Copyright (C) 2005-2015 Red Hat, Inc. * All rights reserved. * * This program is free software; you can redistribute it and/or modify @@ -105,6 +105,9 @@ private double calculateRequestLimit() { rate += topologyDelta; } } + if(rate < topologyDelta) { + rate = topologyDelta; + } return rate; } @@ -166,6 +169,13 @@ public ResultSet execute(Query query) { } } + /** + * Skips rate throttling and exception handling. Do NOT use for normal operations. + */ + public ResultSet executeDirect(Query query) throws QueryTimeoutException, NoHostAvailableException { + return wrappedSession.execute(query); + } + public StorageResultSetFuture executeAsync(String query) { permits.acquire(); ResultSetFuture future = wrappedSession.executeAsync(query); From bb867e35169891e134b80e218eb636dfdeb35e90 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 24 Jun 2015 13:31:21 +0300 Subject: [PATCH 2/2] Set name for the AliveChecker for easier debugging and catch all the exceptions in the aliveChecker thread --- .../rhq/enterprise/server/storage/StorageClientManager.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java index ce07ee00433..9edc5d7c24d 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManager.java @@ -191,6 +191,7 @@ public synchronized boolean init() { initMetricsServer(); JMXUtil.registerMBean(this, OBJECT_NAME); aliveChecker = new SessionAliveChecker(this); + aliveChecker.setName("StorageNode SessionAliveChecker"); aliveChecker.start(); initialized = true; @@ -688,6 +689,8 @@ public void run() { } } catch (InterruptedException e) { // Alive should be false in the next iteration if shutdown was called + } catch(Exception e) { + LOG.error("AliveCheck thread run into an unexpected exception: " + e.getLocalizedMessage()); } } }