From b3747ad24907c175857d47626224b456a8ee0f85 Mon Sep 17 00:00:00 2001 From: John Sanda Date: Wed, 4 Mar 2015 21:34:50 -0500 Subject: [PATCH] [BZ 185375] scan for keys using cql instead of astyanax I am abandoning the use of Astyanax since it requires Cassandra's rpc server to be running. We turn the rpc server off by default. It can be turned on via jmx, but we restrict jmx access to localhost. This means users would have to manually enable the thrift server for each storage node for the upgrade. We cannot impose that kind of burden. --- .../org/rhq/cassandra/schema/KeyScanner.java | 244 ++++++++++++++++++ .../schema/MigrateAggregateMetrics.java | 45 ++-- .../schema/exception/KeyScanException.java | 23 ++ 3 files changed, 295 insertions(+), 17 deletions(-) create mode 100644 modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/KeyScanner.java create mode 100644 modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/exception/KeyScanException.java diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/KeyScanner.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/KeyScanner.java new file mode 100644 index 00000000000..aec1e69f0a8 --- /dev/null +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/KeyScanner.java @@ -0,0 +1,244 @@ +package org.rhq.cassandra.schema; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import com.datastax.driver.core.Host; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.google.common.base.Objects; +import com.google.common.base.Stopwatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.rhq.cassandra.schema.exception.KeyScanException; +import org.rhq.core.util.exception.ThrowableUtil; + +/** + * @author John Sanda + */ +public class KeyScanner { + + private static final Log log = LogFactory.getLog(KeyScanner.class); + + private static final int QUERY_FAILURE_THRESHOLD = 5; + + private static class TokenRange { + long startToken; + long endToken; + + public TokenRange(long startToken, long endToken) { + this.startToken = startToken; + this.endToken = endToken; + } + + @Override + public String toString() { + return Objects.toStringHelper("TokenRange").add("start", startToken).add("end", endToken).toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TokenRange that = (TokenRange) o; + + if (endToken != that.endToken) return false; + if (startToken != that.startToken) return false; + + return true; + } + + @Override + public int hashCode() { + int result = (int) (startToken ^ (startToken >>> 32)); + result = 31 * result + (int) (endToken ^ (endToken >>> 32)); + return result; + } + } + + private Session session; + + private List tokenRanges = new ArrayList(); + + private ExecutorService threadPool; + + public KeyScanner(Session session) { + this.session = session; + Queue hosts = new ArrayDeque(session.getCluster().getMetadata().getAllHosts()); + threadPool = Executors.newFixedThreadPool(getThreadCount(hosts.size())); + + if (hosts.size() == 1) { + // If it is a single node cluster then we have to query the system.local to get + // the tokens. + ResultSet resultSet = session.execute("select tokens from system.local"); + loadTokens(resultSet); + } else { + // If we have a multi-node cluster, things are a little more involved. Each + // node stores its own tokens in system.local, and it stores tokens for all + // of the other nodes in system.peers. This code assumes we are still using a + // round robin load balancing policy with the driver. So if we are trying to + // load tokens for node n1 and if we are also querying n1, then we will get an + // empty result set. We need to execute the query again so that it is routed to + // a different node, e.g., n2, which will have a row in system.peers for n1. + // Switching to a token aware policy would simplify this. + + PreparedStatement query = session.prepare("select tokens from system.peers where peer = ?"); + + while (!hosts.isEmpty()) { + Host host = hosts.poll(); + log.info("Loading tokens for " + host); + ResultSet resultSet = session.execute(query.bind(host.getAddress())); + if (resultSet.isExhausted()) { + for (Host nextHost : hosts) { + resultSet = session.execute(query.bind(host.getAddress())); + if (!resultSet.isExhausted()) { + break; + } + } + } + if (resultSet.isExhausted()) { + throw new IllegalStateException("Failed to load tokens for " + host); + } + loadTokens(resultSet); + } + } + } + + private int getThreadCount(int numNodes) { + String count = System.getProperty("rhq.storage.key-scanner.thread-count"); + if (count == null) { + return Math.min(4 + ((numNodes - 1) * 4), 16); + } + return Integer.parseInt(count); + } + + private void loadTokens(ResultSet resultSet) { + List rows = resultSet.all(); + Set stringTokens = rows.get(0).getSet(0, String.class); + SortedSet tokens = new TreeSet(); + + for (String s : stringTokens) { + tokens.add(Long.parseLong(s)); + } + + Iterator iterator = tokens.iterator(); + long start = iterator.next(); + long end; + + while (iterator.hasNext()) { + end = iterator.next(); + tokenRanges.add(new TokenRange(start, end)); + start = end; + } + start = tokens.first(); + end = tokens.last(); + tokenRanges.add(new TokenRange(end, start)); + } + + public void shutdown() { + log.info("Shutting down"); + threadPool.shutdownNow(); + } + + public Set scanFor1HourKeys() throws InterruptedException, AbortedException { + return scanForKeys("one_hour_metrics", 1011); + } + + public Set scanFor6HourKeys() throws InterruptedException, AbortedException { + return scanForKeys("six_hour_metrics", 375); + } + + public Set scanFor24HourKeys() throws InterruptedException, AbortedException { + return scanForKeys("twenty_four_hour_metrics", 1098); + } + + private Set scanForKeys(String table, int batchSize) throws InterruptedException, AbortedException { + log.info("Scanning for keys for table " + table); + Stopwatch stopwatch = Stopwatch.createStarted(); + + Set keys = new ConcurrentSkipListSet(); + PreparedStatement query = session.prepare( + "SELECT token(schedule_id), schedule_id " + + "FROM rhq." + table + " " + + "WHERE token(schedule_id) >= ? LIMIT " + batchSize); + TaskTracker taskTracker = new TaskTracker(); + + for (TokenRange range : tokenRanges) { + taskTracker.addTask(); + threadPool.submit(tokenScanner(range, query, keys, taskTracker, batchSize)); + } + taskTracker.finishedSchedulingTasks(); + taskTracker.waitForTasksToFinish(); + + stopwatch.stop(); + log.info("Finished scanning for keys for table " + table + " in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + + " ms"); + + return keys; + } + + private Runnable tokenScanner(final TokenRange tokenRange, final PreparedStatement query, + final Set scheduleIds, final TaskTracker taskTracker, final int batchSize) { + return new Runnable() { + @Override + public void run() { + try { + long token = tokenRange.startToken; + long lastToken = tokenRange.startToken; + + while (token <= tokenRange.endToken) { + ResultSet resultSet = executeTokenQuery(query, token); + int count = 0; + for (Row row : resultSet) { + lastToken = row.getLong(0); + scheduleIds.add(row.getInt(1)); + ++count; + } + if (count < batchSize) { + break; + } + token = lastToken + 1; + } + taskTracker.finishedTask(); + } catch (KeyScanException e) { + taskTracker.abort(e.getMessage()); + } catch (Exception e) { + log.error("There was an unexpected error scanning for tokens", e); + taskTracker.abort("Aborting due to unexpected error: " + ThrowableUtil.getRootMessage(e)); + } + } + }; + } + + private ResultSet executeTokenQuery(PreparedStatement query, long token) throws KeyScanException { + + for (int i = 0; i < QUERY_FAILURE_THRESHOLD; ++i) { + try { + return session.execute(query.bind(token)); + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("Failed to execute token query", e); + } else { + log.info("Failed to execute token query: " + ThrowableUtil.getRootMessage(e)); + } + } + } + throw new KeyScanException("Token query failed " + QUERY_FAILURE_THRESHOLD + + " times. The key scan will abort due to these failures."); + } +} diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java index b029df5b941..646dfb8dd94 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java @@ -43,8 +43,6 @@ import com.netflix.astyanax.model.Composite; import com.netflix.astyanax.model.Row; import com.netflix.astyanax.recipes.reader.AllRowsReader; -import com.netflix.astyanax.serializers.CompositeSerializer; -import com.netflix.astyanax.serializers.IntegerSerializer; import com.netflix.astyanax.thrift.ThriftFamilyFactory; import com.yammer.metrics.core.Meter; import com.yammer.metrics.core.MetricsRegistry; @@ -164,21 +162,34 @@ public void execute() { return; } - AstyanaxContext context = createContext(); - context.start(); - Keyspace keyspace = context.getClient(); - - Set scheduleIdsWith1HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( - "one_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), Bucket.ONE_HOUR); - Set scheduleIdsWith6HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( - "six_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), Bucket.SIX_HOUR); - Set scheduleIdsWith24HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( - "twenty_four_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), - Bucket.TWENTY_FOUR_HOUR); - - Stopwatch contextStopWatch = Stopwatch.createStarted(); - context.shutdown(); - contextStopWatch.stop(); + KeyScanner keyScanner = new KeyScanner(session); + Set scheduleIdsWith1HourData = keyScanner.scanFor1HourKeys(); + Set scheduleIdsWith6HourData = keyScanner.scanFor6HourKeys(); + Set scheduleIdsWith24HourData = keyScanner.scanFor24HourKeys(); + keyScanner.shutdown(); + + log.info("There are " + scheduleIdsWith1HourData.size() + " schedule ids with " + + Bucket.ONE_HOUR + " data"); + log.info("There are " + scheduleIdsWith6HourData.size() + " schedule ids with " + + Bucket.SIX_HOUR + " data"); + log.info("There are " + scheduleIdsWith24HourData.size() + " schedule ids with " + + Bucket.TWENTY_FOUR_HOUR + " data"); + +// AstyanaxContext context = createContext(); +// context.start(); +// Keyspace keyspace = context.getClient(); +// +// Set scheduleIdsWith1HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( +// "one_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), Bucket.ONE_HOUR); +// Set scheduleIdsWith6HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( +// "six_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), Bucket.SIX_HOUR); +// Set scheduleIdsWith24HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( +// "twenty_four_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), +// Bucket.TWENTY_FOUR_HOUR); + +// Stopwatch contextStopWatch = Stopwatch.createStarted(); +// context.shutdown(); +// contextStopWatch.stop(); writePermitsRef.set(RateLimiter.create(getWriteLimit(getNumberOfUpNodes()), DEFAULT_WARM_UP, TimeUnit.SECONDS)); diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/exception/KeyScanException.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/exception/KeyScanException.java new file mode 100644 index 00000000000..2f4a6b8f6cb --- /dev/null +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/exception/KeyScanException.java @@ -0,0 +1,23 @@ +package org.rhq.cassandra.schema.exception; + +/** + * @author John Sanda + */ +public class KeyScanException extends Exception { + + public KeyScanException() { + super(); + } + + public KeyScanException(String message) { + super(message); + } + + public KeyScanException(String message, Throwable cause) { + super(message, cause); + } + + public KeyScanException(Throwable cause) { + super(cause); + } +}