From 966e923f6fff5c939d0df9c1b74bb9019615155e Mon Sep 17 00:00:00 2001 From: John Sanda Date: Sun, 22 Feb 2015 16:23:05 -0500 Subject: [PATCH] [BZ 185375] remove key scanning code with hector I went back and did some testing with the original approach of querying against all schedule ids that are returned from the RDBMS. I was able to achieve the same performance (if not better) as I did with hector. I do not see any reason therefore to pull in additional dependencies. Conflicts: modules/common/cassandra-schema/pom.xml --- modules/common/cassandra-schema/pom.xml | 27 ++- .../schema/MigrateAggregateMetrics.java | 166 ++++++++---------- .../org/rhq/cassandra/schema/MigrateData.java | 90 +++++----- 3 files changed, 143 insertions(+), 140 deletions(-) diff --git a/modules/common/cassandra-schema/pom.xml b/modules/common/cassandra-schema/pom.xml index e271c13201e..6a21373d780 100644 --- a/modules/common/cassandra-schema/pom.xml +++ b/modules/common/cassandra-schema/pom.xml @@ -41,13 +41,6 @@ ${cassandra.driver.version} - - me.prettyprint - hector-core - 1.0-5 - - - org.jboss jboss-vfs @@ -128,6 +121,26 @@ + + org.apache.maven.plugins + maven-shade-plugin + 2.3 + + + package + + shade + + + + + org.rhq.cassandra.schema.MigrateMetrics + + + + + + diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java index 2d79a69acd9..c74f460e0fb 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java @@ -2,12 +2,15 @@ import java.io.File; import java.io.IOException; -import java.util.Iterator; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.ParseException; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -18,7 +21,7 @@ import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Session; import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableMap; +import com.google.common.io.LineReader; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -33,12 +36,6 @@ import org.joda.time.DateTime; import org.joda.time.Days; -import me.prettyprint.cassandra.serializers.IntegerSerializer; -import me.prettyprint.cassandra.service.CassandraHostConfigurator; -import me.prettyprint.cassandra.service.KeyIterator; -import me.prettyprint.hector.api.Keyspace; -import me.prettyprint.hector.api.factory.HFactory; - /** *

* Migrates aggregate metrics from the one_hour_metrics, six_hour_metrics, and twenty_four_hour_metrics tables to the @@ -99,23 +96,28 @@ public String toString() { private AtomicInteger remaining24HourMetrics = new AtomicInteger(); + private AtomicInteger migrated1HourMetrics = new AtomicInteger(); + + private AtomicInteger migrated6HourMetrics = new AtomicInteger(); + + private AtomicInteger migrated24HourMetrics = new AtomicInteger(); + private AtomicInteger failedMigrations = new AtomicInteger(); private ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5, new SchemaUpdateThreadFactory())); - private ExecutorService scheduler = Executors.newFixedThreadPool(1); - - private TaskTracker migrations = new TaskTracker(); + private MetricsRegistry metricsRegistry; private Meter migrationsMeter; - private MetricsRegistry metricsRegistry; + private TaskTracker migrations = new TaskTracker(); private int totalMetrics; private double failureThreshold = Double.parseDouble(System.getProperty("rhq.storage.failure-threshold", "0.05")); + @Override public void setSession(Session session) { this.session = session; } @@ -126,15 +128,9 @@ public void bind(Properties properties) { dataDir = properties.getProperty("data.dir", System.getProperty("jboss.server.data.dir")); } - @Override - public String toString() { - return getClass().getSimpleName(); - } - @Override public void execute() { log.info("Starting data migration"); - metricsRegistry = new MetricsRegistry(); migrationsMeter = metricsRegistry.newMeter(MigrateAggregateMetrics.class, "migrations", "migrations", TimeUnit.MINUTES); @@ -148,25 +144,28 @@ public void execute() { return; } writePermits = RateLimiter.create(getWriteLimit() * getNumberOfUpNodes(), 10, TimeUnit.SECONDS); - readPermits = RateLimiter.create(getReadLimit() * getNumberOfUpNodes(), 90, TimeUnit.SECONDS); - Keyspace keyspace = createKeyspace(); + readPermits = RateLimiter.create(getReadLimit() * getNumberOfUpNodes(), 10, TimeUnit.SECONDS); + + log.info("The request limits are " + writePermits.getRate() + " writes per econd and " + + readPermits.getRate() + " reads per second"); + + Set scheduleIds = loadScheduleIds(); + remaining1HourMetrics.set(scheduleIds.size()); + remaining6HourMetrics.set(scheduleIds.size()); + remaining24HourMetrics.set(scheduleIds.size()); + totalMetrics = scheduleIds.size() * 3; MigrationProgressLogger progressLogger = new MigrationProgressLogger(); threadPool.submit(progressLogger); - schedule1HourDataMigrations(keyspace); - schedule6HourDataMigrations(keyspace); - schedule24HourDataMigrations(keyspace); + migrate1HourData(scheduleIds); + migrate6HourData(scheduleIds); + migrate24HourData(scheduleIds); migrations.finishedSchedulingTasks(); migrations.waitForTasksToFinish(); progressLogger.finished(); - if (failedMigrations.get() > 0) { - throw new RuntimeException("There were " + failedMigrations + " failed migration tasks. The upgrade " + - "needs to be run again in order to complete the migration."); - } - dropTables(); } catch (IOException e) { throw new RuntimeException("There was an unexpected I/O error. The are still " + migrations.getRemainingTasks() + " outstanding migration tasks. The upgrade must be run again to " + @@ -182,6 +181,7 @@ public void execute() { } finally { stopwatch.stop(); log.info("Finished data migration in " + stopwatch.elapsed(TimeUnit.SECONDS) + " sec"); + log.info("There were " + failedMigrations + " failed migrations"); shutdown(); } } @@ -191,7 +191,7 @@ private int getWriteLimit() { } private int getReadLimit() { - return Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "110")); + return Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "200")); } private int getNumberOfUpNodes() { @@ -207,111 +207,93 @@ private int getNumberOfUpNodes() { private void shutdown() { try { log.info("Shutting down migration thread pools..."); - scheduler.shutdown(); threadPool.shutdown(); - scheduler.awaitTermination(5, TimeUnit.SECONDS); threadPool.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { } } - private Keyspace createKeyspace() { - CassandraHostConfigurator configurator = new CassandraHostConfigurator("127.0.01"); - Map credentials = ImmutableMap.of( - "username", "rhqadmin", - "password", "rhqadmin" - ); - me.prettyprint.hector.api.Cluster cluster = HFactory.createCluster("rhq", configurator, credentials); - return HFactory.createKeyspace("rhq", cluster); - } + private Set loadScheduleIds() { + NumberFormat formatter = new DecimalFormat("\"#,#\""); + Set scheduleIds = new HashSet(); + InputStream stream = getClass().getResourceAsStream("/schedule_ids"); + LineReader reader = new LineReader(new InputStreamReader(stream)); - private Iterator createKeyIterator(Keyspace keyspace, String table) { - KeyIterator keyIterator = new KeyIterator(keyspace, table, IntegerSerializer.get()); - return keyIterator.iterator(); + try { + String line = reader.readLine(); + while (line != null) { + scheduleIds.add(formatter.parse(line).intValue()); + line = reader.readLine(); + } + return scheduleIds; + } catch (IOException e) { + throw new RuntimeException("Failed to load schedule ids"); + } catch (ParseException e) { + throw new RuntimeException("Failed to load schedule ids"); + } } - private void schedule1HourDataMigrations(Keyspace keyspace) throws IOException { - Iterator keyIterator = createKeyIterator(keyspace, "one_hour_metrics"); + private void migrate1HourData(Set scheduleIds) throws IOException { DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now()); DateTime startTime = endTime.minus(Days.days(14)); PreparedStatement query = session.prepare( "SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.one_hour_metrics " + "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); - - scheduleMigrations(keyIterator, query, Bucket.ONE_HOUR, remaining1HourMetrics); + migrateData(scheduleIds, query, Bucket.ONE_HOUR, remaining1HourMetrics, migrated1HourMetrics); } - private void schedule6HourDataMigrations(Keyspace keyspace) throws IOException { - Iterator keyIterator = createKeyIterator(keyspace, "six_hour_metrics"); + private void migrate6HourData(Set scheduleIds) throws IOException { DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now()); - DateTime startTime = endTime.minus(Days.days(31)); + DateTime startTime = endTime.minus(Days.days(14)); PreparedStatement query = session.prepare( "SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.six_hour_metrics " + "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); - - scheduleMigrations(keyIterator, query, Bucket.SIX_HOUR, remaining6HourMetrics); + migrateData(scheduleIds, query, Bucket.SIX_HOUR, remaining6HourMetrics, migrated6HourMetrics); } - private void schedule24HourDataMigrations(Keyspace keyspace) throws IOException { - Iterator keyIterator = createKeyIterator(keyspace, "twenty_four_hour_metrics"); + private void migrate24HourData(Set scheduleIds) throws IOException { DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now()); - DateTime startTime = endTime.minus(Days.days(365)); + DateTime startTime = endTime.minus(Days.days(14)); PreparedStatement query = session.prepare( "SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.twenty_four_hour_metrics " + "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); - - scheduleMigrations(keyIterator, query, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics); + migrateData(scheduleIds, query, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics, migrated24HourMetrics); } - private void scheduleMigrations(Iterator keyIterator, final PreparedStatement query, final Bucket bucket, - final AtomicInteger remainingMetrics) throws IOException { + private void migrateData(final Set scheduleIds, final PreparedStatement query, final Bucket bucket, + final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics) throws IOException { + Stopwatch stopwatch = Stopwatch.createStarted(); - log.info("Scanning for schedule ids with " + bucket + " data"); + log.info("Scheduling data migration tasks for " + bucket + " data"); final MigrationLog migrationLog = new MigrationLog(new File(dataDir, bucket + "_migration.log")); final Set migratedScheduleIds = migrationLog.read(); - int count = 0; - while (keyIterator.hasNext()) { - final Integer scheduleId = keyIterator.next(); - count++; - remainingMetrics.incrementAndGet(); - if (log.isDebugEnabled()) { - log.debug("Scheduling " + bucket + " data migration for schedule id " + scheduleId); + for (final Integer scheduleId : scheduleIds) { + if (!migratedScheduleIds.contains(scheduleId)) { + migrations.addTask(); + readPermits.acquire(); + ResultSetFuture queryFuture = session.executeAsync(query.bind(scheduleId)); + ListenableFuture> migrationFuture = Futures.transform(queryFuture, + new MigrateData(scheduleId, bucket, writePermits, session), threadPool); + Futures.addCallback(migrationFuture, migrationFinished(scheduleId, bucket, migrationLog, + remainingMetrics, migratedMetrics), threadPool); } - migrations.addTask(); - scheduler.submit(new Runnable() { - @Override - public void run() { - try { - if (!migratedScheduleIds.contains(scheduleId)) { - readPermits.acquire(); - ResultSetFuture queryFuture = session.executeAsync(query.bind(scheduleId)); - ListenableFuture> migrationFuture = Futures.transform(queryFuture, - new MigrateData(scheduleId, bucket, writePermits, session), threadPool); - Futures.addCallback(migrationFuture, migrationFinished(scheduleId, bucket, migrationLog, - remainingMetrics), threadPool); - } - } catch (Exception e) { - log.warn("There was an error migrating data", e); - } - } - }); } - totalMetrics += count; + stopwatch.stop(); - log.info("Finished scanning for schedule ids with " + bucket + " data in " + + log.info("Finished scheduling migration tasks for " + bucket + " data in " + stopwatch.elapsed(TimeUnit.SECONDS) + " sec"); - log.info("There are a total of " + count + " schedule ids with " + bucket + " data to migrate"); } private FutureCallback> migrationFinished(final Integer scheduleId, final Bucket bucket, - final MigrationLog migrationLog, final AtomicInteger remainingMetrics) { + final MigrationLog migrationLog, final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics) { return new FutureCallback>() { @Override public void onSuccess(List resultSets) { try { migrations.finishedTask(); remainingMetrics.decrementAndGet(); + migratedMetrics.incrementAndGet(); migrationLog.write(scheduleId); migrationsMeter.mark(); if (log.isDebugEnabled()) { @@ -362,7 +344,7 @@ public void run() { reportMigrationRates = true; } - if ((failedMigrations.get() / totalMetrics) > failureThreshold) { + if (totalMetrics > 0 && (failedMigrations.get() / totalMetrics) > failureThreshold) { migrations.abort("The failure threshold has been exceeded with " + failedMigrations.get() + " failed migrations"); } diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java index 1e9eb5742a4..ac5575d9d7b 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java @@ -47,52 +47,60 @@ public MigrateData(Integer scheduleId, MigrateAggregateMetrics.Bucket bucket, Ra @Override public ListenableFuture> apply(ResultSet resultSet) throws Exception { - List insertFutures = new ArrayList(); - List rows = resultSet.all(); - Date time = rows.get(0).getDate(1); - Date nextTime; - Double max = null; - Double min = null; - Double avg = null; - Long writeTime = rows.get(0).getLong(5); - Integer ttl = rows.get(0).getInt(4); - List statements = new ArrayList(BATCH_SIZE); - - for (Row row : resultSet) { - nextTime = row.getDate(1); - if (nextTime.equals(time)) { - int type = row.getInt(2); - switch (type) { - case 0: - max = row.getDouble(3); - break; - case 1: - min = row.getDouble(3); - break; - default: - avg = row.getDouble(3); - } - } else { - if (isDataMissing(avg, max, min)) { - log.debug("We only have a partial " + bucket + " metric for {scheduleId: " + scheduleId + - ", time: " + time.getTime() + "}. It will not be migrated."); + try { + List insertFutures = new ArrayList(); + if (resultSet.isExhausted()) { + return Futures.allAsList(insertFutures); + } + List rows = resultSet.all(); + Date time = rows.get(0).getDate(1); + Date nextTime; + Double max = null; + Double min = null; + Double avg = null; + Long writeTime = rows.get(0).getLong(5); + Integer ttl = rows.get(0).getInt(4); + List statements = new ArrayList(BATCH_SIZE); + + for (Row row : resultSet) { + nextTime = row.getDate(1); + if (nextTime.equals(time)) { + int type = row.getInt(2); + switch (type) { + case 0: + max = row.getDouble(3); + break; + case 1: + min = row.getDouble(3); + break; + default: + avg = row.getDouble(3); + } } else { - statements.add(createInsertStatement(time, avg, max, min, ttl, writeTime)); - if (statements.size() == BATCH_SIZE) { - insertFutures.add(writeBatch(statements)); - statements.clear(); + if (isDataMissing(avg, max, min)) { + log.debug("We only have a partial " + bucket + " metric for {scheduleId: " + scheduleId + + ", time: " + time.getTime() + "}. It will not be migrated."); + } else { + statements.add(createInsertStatement(time, avg, max, min, ttl, writeTime)); + if (statements.size() == BATCH_SIZE) { + insertFutures.add(writeBatch(statements)); + statements.clear(); + } } - } - time = nextTime; - max = row.getDouble(3); - min = null; - avg = null; - ttl = row.getInt(4); - writeTime = row.getLong(5); + time = nextTime; + max = row.getDouble(3); + min = null; + avg = null; + ttl = row.getInt(4); + writeTime = row.getLong(5); + } } + return Futures.allAsList(insertFutures); + } catch (Exception e) { + log.warn("An error occurred while migrating data", e); + throw e; } - return Futures.allAsList(insertFutures); } private boolean isDataMissing(Double avg, Double max, Double min) {