diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/KeyScanner.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/KeyScanner.java new file mode 100644 index 00000000000..aec1e69f0a8 --- /dev/null +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/KeyScanner.java @@ -0,0 +1,244 @@ +package org.rhq.cassandra.schema; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import com.datastax.driver.core.Host; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.google.common.base.Objects; +import com.google.common.base.Stopwatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.rhq.cassandra.schema.exception.KeyScanException; +import org.rhq.core.util.exception.ThrowableUtil; + +/** + * @author John Sanda + */ +public class KeyScanner { + + private static final Log log = LogFactory.getLog(KeyScanner.class); + + private static final int QUERY_FAILURE_THRESHOLD = 5; + + private static class TokenRange { + long startToken; + long endToken; + + public TokenRange(long startToken, long endToken) { + this.startToken = startToken; + this.endToken = endToken; + } + + @Override + public String toString() { + return Objects.toStringHelper("TokenRange").add("start", startToken).add("end", endToken).toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TokenRange that = (TokenRange) o; + + if (endToken != that.endToken) return false; + if (startToken != that.startToken) return false; + + return true; + } + + @Override + public int hashCode() { + int result = (int) (startToken ^ (startToken >>> 32)); + result = 31 * result + (int) (endToken ^ (endToken >>> 32)); + return result; + } + } + + private Session session; + + private List tokenRanges = new ArrayList(); + + private ExecutorService threadPool; + + public KeyScanner(Session session) { + this.session = session; + Queue hosts = new ArrayDeque(session.getCluster().getMetadata().getAllHosts()); + threadPool = Executors.newFixedThreadPool(getThreadCount(hosts.size())); + + if (hosts.size() == 1) { + // If it is a single node cluster then we have to query the system.local to get + // the tokens. + ResultSet resultSet = session.execute("select tokens from system.local"); + loadTokens(resultSet); + } else { + // If we have a multi-node cluster, things are a little more involved. Each + // node stores its own tokens in system.local, and it stores tokens for all + // of the other nodes in system.peers. This code assumes we are still using a + // round robin load balancing policy with the driver. So if we are trying to + // load tokens for node n1 and if we are also querying n1, then we will get an + // empty result set. We need to execute the query again so that it is routed to + // a different node, e.g., n2, which will have a row in system.peers for n1. + // Switching to a token aware policy would simplify this. + + PreparedStatement query = session.prepare("select tokens from system.peers where peer = ?"); + + while (!hosts.isEmpty()) { + Host host = hosts.poll(); + log.info("Loading tokens for " + host); + ResultSet resultSet = session.execute(query.bind(host.getAddress())); + if (resultSet.isExhausted()) { + for (Host nextHost : hosts) { + resultSet = session.execute(query.bind(host.getAddress())); + if (!resultSet.isExhausted()) { + break; + } + } + } + if (resultSet.isExhausted()) { + throw new IllegalStateException("Failed to load tokens for " + host); + } + loadTokens(resultSet); + } + } + } + + private int getThreadCount(int numNodes) { + String count = System.getProperty("rhq.storage.key-scanner.thread-count"); + if (count == null) { + return Math.min(4 + ((numNodes - 1) * 4), 16); + } + return Integer.parseInt(count); + } + + private void loadTokens(ResultSet resultSet) { + List rows = resultSet.all(); + Set stringTokens = rows.get(0).getSet(0, String.class); + SortedSet tokens = new TreeSet(); + + for (String s : stringTokens) { + tokens.add(Long.parseLong(s)); + } + + Iterator iterator = tokens.iterator(); + long start = iterator.next(); + long end; + + while (iterator.hasNext()) { + end = iterator.next(); + tokenRanges.add(new TokenRange(start, end)); + start = end; + } + start = tokens.first(); + end = tokens.last(); + tokenRanges.add(new TokenRange(end, start)); + } + + public void shutdown() { + log.info("Shutting down"); + threadPool.shutdownNow(); + } + + public Set scanFor1HourKeys() throws InterruptedException, AbortedException { + return scanForKeys("one_hour_metrics", 1011); + } + + public Set scanFor6HourKeys() throws InterruptedException, AbortedException { + return scanForKeys("six_hour_metrics", 375); + } + + public Set scanFor24HourKeys() throws InterruptedException, AbortedException { + return scanForKeys("twenty_four_hour_metrics", 1098); + } + + private Set scanForKeys(String table, int batchSize) throws InterruptedException, AbortedException { + log.info("Scanning for keys for table " + table); + Stopwatch stopwatch = Stopwatch.createStarted(); + + Set keys = new ConcurrentSkipListSet(); + PreparedStatement query = session.prepare( + "SELECT token(schedule_id), schedule_id " + + "FROM rhq." + table + " " + + "WHERE token(schedule_id) >= ? LIMIT " + batchSize); + TaskTracker taskTracker = new TaskTracker(); + + for (TokenRange range : tokenRanges) { + taskTracker.addTask(); + threadPool.submit(tokenScanner(range, query, keys, taskTracker, batchSize)); + } + taskTracker.finishedSchedulingTasks(); + taskTracker.waitForTasksToFinish(); + + stopwatch.stop(); + log.info("Finished scanning for keys for table " + table + " in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + + " ms"); + + return keys; + } + + private Runnable tokenScanner(final TokenRange tokenRange, final PreparedStatement query, + final Set scheduleIds, final TaskTracker taskTracker, final int batchSize) { + return new Runnable() { + @Override + public void run() { + try { + long token = tokenRange.startToken; + long lastToken = tokenRange.startToken; + + while (token <= tokenRange.endToken) { + ResultSet resultSet = executeTokenQuery(query, token); + int count = 0; + for (Row row : resultSet) { + lastToken = row.getLong(0); + scheduleIds.add(row.getInt(1)); + ++count; + } + if (count < batchSize) { + break; + } + token = lastToken + 1; + } + taskTracker.finishedTask(); + } catch (KeyScanException e) { + taskTracker.abort(e.getMessage()); + } catch (Exception e) { + log.error("There was an unexpected error scanning for tokens", e); + taskTracker.abort("Aborting due to unexpected error: " + ThrowableUtil.getRootMessage(e)); + } + } + }; + } + + private ResultSet executeTokenQuery(PreparedStatement query, long token) throws KeyScanException { + + for (int i = 0; i < QUERY_FAILURE_THRESHOLD; ++i) { + try { + return session.execute(query.bind(token)); + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("Failed to execute token query", e); + } else { + log.info("Failed to execute token query: " + ThrowableUtil.getRootMessage(e)); + } + } + } + throw new KeyScanException("Token query failed " + QUERY_FAILURE_THRESHOLD + + " times. The key scan will abort due to these failures."); + } +} diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java index b029df5b941..646dfb8dd94 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java @@ -43,8 +43,6 @@ import com.netflix.astyanax.model.Composite; import com.netflix.astyanax.model.Row; import com.netflix.astyanax.recipes.reader.AllRowsReader; -import com.netflix.astyanax.serializers.CompositeSerializer; -import com.netflix.astyanax.serializers.IntegerSerializer; import com.netflix.astyanax.thrift.ThriftFamilyFactory; import com.yammer.metrics.core.Meter; import com.yammer.metrics.core.MetricsRegistry; @@ -164,21 +162,34 @@ public void execute() { return; } - AstyanaxContext context = createContext(); - context.start(); - Keyspace keyspace = context.getClient(); - - Set scheduleIdsWith1HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( - "one_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), Bucket.ONE_HOUR); - Set scheduleIdsWith6HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( - "six_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), Bucket.SIX_HOUR); - Set scheduleIdsWith24HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( - "twenty_four_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), - Bucket.TWENTY_FOUR_HOUR); - - Stopwatch contextStopWatch = Stopwatch.createStarted(); - context.shutdown(); - contextStopWatch.stop(); + KeyScanner keyScanner = new KeyScanner(session); + Set scheduleIdsWith1HourData = keyScanner.scanFor1HourKeys(); + Set scheduleIdsWith6HourData = keyScanner.scanFor6HourKeys(); + Set scheduleIdsWith24HourData = keyScanner.scanFor24HourKeys(); + keyScanner.shutdown(); + + log.info("There are " + scheduleIdsWith1HourData.size() + " schedule ids with " + + Bucket.ONE_HOUR + " data"); + log.info("There are " + scheduleIdsWith6HourData.size() + " schedule ids with " + + Bucket.SIX_HOUR + " data"); + log.info("There are " + scheduleIdsWith24HourData.size() + " schedule ids with " + + Bucket.TWENTY_FOUR_HOUR + " data"); + +// AstyanaxContext context = createContext(); +// context.start(); +// Keyspace keyspace = context.getClient(); +// +// Set scheduleIdsWith1HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( +// "one_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), Bucket.ONE_HOUR); +// Set scheduleIdsWith6HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( +// "six_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), Bucket.SIX_HOUR); +// Set scheduleIdsWith24HourData = loadScheduleIds(keyspace, ColumnFamily.newColumnFamily( +// "twenty_four_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()), +// Bucket.TWENTY_FOUR_HOUR); + +// Stopwatch contextStopWatch = Stopwatch.createStarted(); +// context.shutdown(); +// contextStopWatch.stop(); writePermitsRef.set(RateLimiter.create(getWriteLimit(getNumberOfUpNodes()), DEFAULT_WARM_UP, TimeUnit.SECONDS)); diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/exception/KeyScanException.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/exception/KeyScanException.java new file mode 100644 index 00000000000..2f4a6b8f6cb --- /dev/null +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/exception/KeyScanException.java @@ -0,0 +1,23 @@ +package org.rhq.cassandra.schema.exception; + +/** + * @author John Sanda + */ +public class KeyScanException extends Exception { + + public KeyScanException() { + super(); + } + + public KeyScanException(String message) { + super(message); + } + + public KeyScanException(String message, Throwable cause) { + super(message, cause); + } + + public KeyScanException(Throwable cause) { + super(cause); + } +}