diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbortedException.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbortedException.java
new file mode 100644
index 00000000000..a1d632001b8
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbortedException.java
@@ -0,0 +1,23 @@
+package org.rhq.cassandra.schema;
+
+/**
+ * @author John Sanda
+ */
+public class AbortedException extends Exception {
+
+ public AbortedException() {
+ super();
+ }
+
+ public AbortedException(String message) {
+ super(message);
+ }
+
+ public AbortedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public AbortedException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java
index 4ba42b2bbac..a5af97970eb 100644
--- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java
@@ -2,21 +2,13 @@
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,24 +16,28 @@
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Batch;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
-import com.google.common.io.LineReader;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricsRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+import org.joda.time.Days;
+
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.service.CassandraHostConfigurator;
+import me.prettyprint.cassandra.service.KeyIterator;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.factory.HFactory;
/**
*
@@ -67,7 +63,7 @@ public class MigrateAggregateMetrics implements Step {
private static final Log log = LogFactory.getLog(MigrateAggregateMetrics.class);
- private static enum Bucket {
+ public static enum Bucket {
ONE_HOUR("one_hour"),
@@ -89,28 +85,32 @@ public String toString() {
private Session session;
- private DBConnectionFactory dbConnectionFactory;
-
- private PreparedStatement find1HourData;
+ private String dataDir;
- private PreparedStatement find6HourData;
+ private DBConnectionFactory dbConnectionFactory;
- private PreparedStatement find24HourData;
+ private RateLimiter readPermits;
- private int jobBatchSize = 8;
+ private RateLimiter writePermits;
- private int concurrentJobLimit = 5;
+ private AtomicInteger remaining1HourMetrics = new AtomicInteger();
- private RateLimiter writePermits;
+ private AtomicInteger remaining6HourMetrics = new AtomicInteger();
- private Semaphore readPermits = new Semaphore(jobBatchSize * concurrentJobLimit);
+ private AtomicInteger remaining24HourMetrics = new AtomicInteger();
private AtomicInteger failedMigrations = new AtomicInteger();
- private ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
+ private ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5,
new SchemaUpdateThreadFactory()));
- private String dataDir;
+ private ExecutorService scheduler = Executors.newFixedThreadPool(1);
+
+ private TaskTracker migrations = new TaskTracker();
+
+ private Meter migrationsMeter;
+
+ private MetricsRegistry metricsRegistry;
@Override
public void setSession(Session session) {
@@ -130,37 +130,64 @@ public String toString() {
@Override
public void execute() {
- // dbConnectionFactory can be null in test environments which is fine because we start tests with a brand
- // new schema and cluster. In this case, we do not need to do anything since it is not an upgrade scenario.
- if (dbConnectionFactory == null) {
- log.info("The relational database connection factory is not set. No data migration necessary");
- } else {
- writePermits = RateLimiter.create(calculatePermits(), 30, TimeUnit.SECONDS);
+ log.info("Starting data migration");
- Stopwatch stopwatch = new Stopwatch().start();
- initPreparedStatements();
- Set scheduleIds = loadScheduleIds();
+ metricsRegistry = new MetricsRegistry();
+ migrationsMeter = metricsRegistry.newMeter(MigrateAggregateMetrics.class, "migrations", "migrations",
+ TimeUnit.MINUTES);
- log.info("Migrating aggregate metrics for " + scheduleIds.size() + " schedule ids");
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ // dbConnectionFactory can be null in test environments which is fine because we start tests with a brand
+ // new schema and cluster. In this case, we do not need to do anything since it is not an upgrade scenario.
+ if (dbConnectionFactory == null) {
+ log.info("The relational database connection factory is not set. No data migration necessary");
+ return;
+ }
+ writePermits = RateLimiter.create(getWriteLimit() * getNumberOfUpNodes(), 10, TimeUnit.SECONDS);
+ readPermits = RateLimiter.create(getReadLimit() * getNumberOfUpNodes(), 90, TimeUnit.SECONDS);
+ Keyspace keyspace = createKeyspace();
+ MigrationProgressLogger progressLogger = new MigrationProgressLogger();
- migrate(scheduleIds, find1HourData, Bucket.ONE_HOUR);
- migrate(scheduleIds, find6HourData, Bucket.SIX_HOUR);
- migrate(scheduleIds, find24HourData, Bucket.TWENTY_FOUR_HOUR);
+ threadPool.submit(progressLogger);
- stopwatch.stop();
- log.info("Finished aggregate metrics migration in " + stopwatch.elapsed(TimeUnit.SECONDS) + " seconds");
+ schedule1HourDataMigrations(keyspace);
+ schedule6HourDataMigrations(keyspace);
+ schedule24HourDataMigrations(keyspace);
+
+ migrations.finishedSchedulingTasks();
+ migrations.waitForTasksToFinish();
+ progressLogger.finished();
if (failedMigrations.get() > 0) {
- throw new RuntimeException("There were " + failedMigrations.get() + " failed migrations. The " +
- "upgrade will have to be run again to complete the migration.");
+ throw new RuntimeException("There were " + failedMigrations + " failed migration tasks. The upgrade " +
+ "needs to be run again in order to complete the migration.");
}
+ dropTables();
+ } catch (IOException e) {
+ throw new RuntimeException("There was an unexpected I/O error. The are still " +
+ migrations.getRemainingTasks() + " outstanding migration tasks. The upgrade must be run again to " +
+ "complete the migration.", e);
+ } catch (AbortedException e) {
+ throw new RuntimeException("The migration was aborted. There are are still " +
+ migrations.getRemainingTasks() +" outstanding migration tasks. The upgrade must be run again to " +
+ "complete the migration.", e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("The migration was interrupted. There are are still " +
+ migrations.getRemainingTasks() +" outstanding migration tasks. The upgrade must be run again to " +
+ "complete the migration.", e);
+ } finally {
+ stopwatch.stop();
+ log.info("Finished data migration in " + stopwatch.elapsed(TimeUnit.SECONDS) + " sec");
}
- dropTables();
}
- private int calculatePermits() {
- int requestLimit = Integer.parseInt(System.getProperty("rhq.storage.request.limit", "20000"));
- return requestLimit * getNumberOfUpNodes();
+ private int getWriteLimit() {
+ return Integer.parseInt(System.getProperty("rhq.storage.request.limit", "20000"));
+ }
+
+ private int getReadLimit() {
+ return Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "100"));
}
private int getNumberOfUpNodes() {
@@ -173,485 +200,131 @@ private int getNumberOfUpNodes() {
return count;
}
- private void migrate(Set scheduleIds, PreparedStatement query, final Bucket bucket) {
- log.info("Migrating " + bucket + " data for " + scheduleIds.size() + " schedules");
-
- CountDownLatch latch = new CountDownLatch(scheduleIds.size());
- MigrationProgressLogger progressLogger = new MigrationProgressLogger(bucket, latch);
- File logFile = new File(dataDir, bucket + "_migration.log");
- MigrationLog migrationLog = null;
- try {
- migrationLog = new MigrationLog(logFile);
- Set migratedScheduleIds = migrationLog.read();
- threadPool.submit(progressLogger);
- List scheduleIdsBatch = new ArrayList(jobBatchSize);
- for (Integer scheduleId : scheduleIds) {
- scheduleIdsBatch.add(scheduleId);
- if (scheduleIdsBatch.size() == jobBatchSize) {
- submitBatch(scheduleIdsBatch, migratedScheduleIds, latch, bucket, query, migrationLog);
- scheduleIdsBatch = new ArrayList(jobBatchSize);
- }
- }
- if (!scheduleIds.isEmpty()) {
- submitBatch(scheduleIdsBatch, migratedScheduleIds, latch, bucket, query, migrationLog);
- }
- latch.await();
- log.info("Finished migrating " + bucket + " data");
- } catch (InterruptedException e) {
- threadPool.shutdownNow();
- throw new RuntimeException("Migration of " + bucket + " data did not complete due to an interrupt. The " +
- "upgrade will have to be run again to finish the migration", e);
- } catch (IOException e) {
- throw new RuntimeException("Migration of " + bucket + " data did not complete due to an I/O error. The " +
- "upgrade will have to be run again to finish the migration", e);
- } finally {
- progressLogger.finished();
- try {
- migrationLog.close();
- } catch (IOException e) {
- log.warn("There was an error closing " + logFile.getAbsolutePath(), e);
- }
- }
- }
-
- private void submitBatch(final List scheduleIds, final Set migratedScheduleIds,
- final CountDownLatch latch, final MigrateAggregateMetrics.Bucket bucket, final PreparedStatement query,
- final MigrationLog migrationLog) throws InterruptedException {
- readPermits.acquire(jobBatchSize);
- threadPool.submit(processBatch(scheduleIds, migratedScheduleIds, latch, bucket, query,
- migrationLog));
+ private Keyspace createKeyspace() {
+ CassandraHostConfigurator configurator = new CassandraHostConfigurator("127.0.01");
+ Map credentials = ImmutableMap.of(
+ "username", "rhqadmin",
+ "password", "rhqadmin"
+ );
+ me.prettyprint.hector.api.Cluster cluster = HFactory.createCluster("rhq", configurator, credentials);
+ return HFactory.createKeyspace("rhq", cluster);
}
- private Runnable processBatch(final List scheduleIds, final Set migratedScheduleIds,
- final CountDownLatch latch, final MigrateAggregateMetrics.Bucket bucket, final PreparedStatement query,
- final MigrationLog migrationLog) {
- return new Runnable() {
- @Override
- public void run() {
- try {
- for (Integer scheduleId : scheduleIds) {
- if (migratedScheduleIds.contains(scheduleId)) {
- if (log.isDebugEnabled()) {
- log.debug(bucket + " data for schedule id " + scheduleId + " has already been " +
- "migrated. It will be skipped.");
- }
- readPermits.release();
- latch.countDown();
- } else {
- ResultSetFuture queryFuture = session.executeAsync(query.bind(scheduleId));
- ListenableFuture migrationFuture = Futures.transform(queryFuture,
- new MigrateData(scheduleId, bucket), threadPool);
- Futures.addCallback(migrationFuture, migrationFinished(scheduleId, bucket, latch, migrationLog));
- }
- }
- } catch (Exception e) {
- log.error("There was an unexpected error processing data", e);
- }
- }
- };
+ private Iterator createKeyIterator(Keyspace keyspace, String table) {
+ KeyIterator keyIterator = new KeyIterator(keyspace, table, IntegerSerializer.get());
+ return keyIterator.iterator();
}
- private void initPreparedStatements() {
- find1HourData = session.prepare(
+ private void schedule1HourDataMigrations(Keyspace keyspace) throws IOException {
+ Iterator keyIterator = createKeyIterator(keyspace, "one_hour_metrics");
+ DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now());
+ DateTime startTime = endTime.minus(Days.days(14));
+ PreparedStatement query = session.prepare(
"SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.one_hour_metrics " +
- "WHERE schedule_id = ?");
+ "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
+
+ scheduleMigrations(keyIterator, query, Bucket.ONE_HOUR, remaining1HourMetrics);
+ }
- find6HourData = session.prepare(
+ private void schedule6HourDataMigrations(Keyspace keyspace) throws IOException {
+ Iterator keyIterator = createKeyIterator(keyspace, "six_hour_metrics");
+ DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now());
+ DateTime startTime = endTime.minus(Days.days(31));
+ PreparedStatement query = session.prepare(
"SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.six_hour_metrics " +
- "WHERE schedule_id = ?");
+ "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
- find24HourData = session.prepare(
- "SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.twenty_four_hour_metrics " +
- "WHERE schedule_id = ?");
+ scheduleMigrations(keyIterator, query, Bucket.SIX_HOUR, remaining6HourMetrics);
}
- private Set loadScheduleIds() {
-// Connection connection = null;
-// Statement statement = null;
-// java.sql.ResultSet resultSet = null;
-// try {
-// connection = dbConnectionFactory.newConnection();
-// statement = connection.createStatement();
-// resultSet = statement.executeQuery(
-// "SELECT s.id FROM rhq_measurement_sched s INNER JOIN rhq_measurement_def d on s.definition = d.id " +
-// "WHERE d.data_type = 0");
-// Set scheduleIds = new HashSet();
-//
-// while (resultSet.next()) {
-// scheduleIds.add(resultSet.getInt(1));
-// }
-//
-// return scheduleIds;
-// } catch (SQLException e) {
-// throw new RuntimeException("Cannot migrate aggregate metrics. There was an error loading schedule ids", e);
-// } finally {
-// if (resultSet != null) {
-// try {
-// resultSet.close();
-// } catch (SQLException e) {
-// log.info("There was an error closing the SQL result set", e);
-// }
-// }
-// if (statement != null) {
-// try {
-// statement.close();
-// } catch (SQLException e) {
-// log.info("There was an error closing the SQL statement", e);
-// }
-// }
-// if (connection != null) {
-// try {
-// connection.close();
-// } catch (SQLException e) {
-// log.info("There was an error closing the SQL connection", e);
-// }
-// }
-// }
- NumberFormat formatter = new DecimalFormat("\"#,#\"");
- Set scheduleIds = new HashSet();
- InputStream stream = getClass().getResourceAsStream("/schedule_ids");
- LineReader reader = new LineReader(new InputStreamReader(stream));
-
- try {
- String line = reader.readLine();
- while (line != null) {
- scheduleIds.add(formatter.parse(line).intValue());
- line = reader.readLine();
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to load schedule ids");
- } catch (ParseException e) {
- throw new RuntimeException("Failed to load schedule ids");
- }
+ private void schedule24HourDataMigrations(Keyspace keyspace) throws IOException {
+ Iterator keyIterator = createKeyIterator(keyspace, "twenty_four_hour_metrics");
+ DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now());
+ DateTime startTime = endTime.minus(Days.days(365));
+ PreparedStatement query = session.prepare(
+ "SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.twenty_four_hour_metrics " +
+ "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
- return scheduleIds;
+ scheduleMigrations(keyIterator, query, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics);
}
- private List migrate1hrData() {
- List scheduleIds = new ArrayList(100000);
- List batch = new ArrayList(jobBatchSize);
-
- NumberFormat formatter = new DecimalFormat("\"#,#\"");
- InputStream stream = getClass().getResourceAsStream("/schedule_ids");
- LineReader reader = new LineReader(new InputStreamReader(stream));
+ private void scheduleMigrations(Iterator keyIterator, final PreparedStatement query, final Bucket bucket,
+ final AtomicInteger remainingMetrics) throws IOException {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ log.info("Scanning for schedule ids with " + bucket + " data");
+ final MigrationLog migrationLog = new MigrationLog(new File(dataDir, bucket + "_migration.log"));
+ final Set migratedScheduleIds = migrationLog.read();
+ int count = 0;
- try {
- String line = reader.readLine();
- while (line != null) {
- Integer scheduleId = formatter.parse(line).intValue();
- scheduleIds.add(scheduleId);
- batch.add(scheduleId);
- line = reader.readLine();
+ while (keyIterator.hasNext()) {
+ final Integer scheduleId = keyIterator.next();
+ count++;
+ remainingMetrics.incrementAndGet();
+ if (log.isDebugEnabled()) {
+ log.debug("Scheduling " + bucket + " data migration for schedule id " + scheduleId);
}
- } catch (IOException e) {
- throw new RuntimeException("Failed to load schedule ids");
- } catch (ParseException e) {
- throw new RuntimeException("Failed to load schedule ids");
+ migrations.addTask();
+// migrationsQeue.offer(new MigrationArgs(scheduleId, bucket, query, migratedScheduleIds, migrationLog));
+ scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (!migratedScheduleIds.contains(scheduleId)) {
+ readPermits.acquire();
+ ResultSetFuture queryFuture = session.executeAsync(query.bind(scheduleId));
+ ListenableFuture> migrationFuture = Futures.transform(queryFuture,
+ new MigrateData(scheduleId, bucket, writePermits, session), threadPool);
+ Futures.addCallback(migrationFuture, migrationFinished(scheduleId, bucket, migrationLog,
+ remainingMetrics), threadPool);
+ }
+ } catch (Exception e) {
+ log.warn("There was an error migrating data", e);
+ }
+ }
+ });
}
- return scheduleIds;
+ stopwatch.stop();
+ log.info("Finished scanning for schedule ids with " + bucket + " data in " +
+ stopwatch.elapsed(TimeUnit.SECONDS) + " sec");
+ log.info("There are a total of " + count + " schedule ids with " + bucket + " data to migrate");
}
- private void dropTables() {
-// ResultSet resultSet = session.execute("SELECT columnfamily_name FROM system.schema_columnfamilies " +
-// "WHERE keyspace_name = 'rhq'");
-// for (Row row : resultSet) {
-// String table = row.getString(0);
-// if (table.equals("one_hour_metrics") || table.equals("six_hour_metrics") ||
-// table.equals("twenty_four_hour_metrics")) {
-// log.info("Dropping table " + table);
-// session.execute("DROP table rhq." + table);
-// }
-// }
- }
-
- private FutureCallback migrationFinished(final Integer scheduleId, final Bucket bucket,
- final CountDownLatch latch, final MigrationLog migrationLog) {
- return new FutureCallback() {
+ private FutureCallback> migrationFinished(final Integer scheduleId, final Bucket bucket,
+ final MigrationLog migrationLog, final AtomicInteger remainingMetrics) {
+ return new FutureCallback>() {
@Override
- public void onSuccess(Integer metricsWritten) {
- latch.countDown();
- readPermits.release();
- log.debug("Finished migration for schedule id " + scheduleId);
+ public void onSuccess(List resultSets) {
try {
+ migrations.finishedTask();
+ remainingMetrics.decrementAndGet();
migrationLog.write(scheduleId);
+ migrationsMeter.mark();
+ if (log.isDebugEnabled()) {
+ log.debug("Finished migrating " + bucket + " data for schedule id " + scheduleId);
+ }
} catch (IOException e) {
- log.warn("Failed to update migration log for bucket " + bucket + " and schedule id " + scheduleId);
+ log.warn("Failed to log successful migration of " + bucket + " data for schedule id " +
+ scheduleId, e);
}
}
@Override
public void onFailure(Throwable t) {
- latch.countDown();
- readPermits.release();
- log.debug("Finished migration for schedule id " + scheduleId);
+ log.info("Failed to migrate " + bucket + " data for schedule id " + scheduleId);
+ migrations.finishedTask();
+ migrationsMeter.mark();
+ remainingMetrics.decrementAndGet();
failedMigrations.incrementAndGet();
}
};
}
- private class MigrateData implements Function, FutureCallback {
-
- private Integer scheduleId;
-
- private Bucket bucket;
-
- private boolean writeFailed;
-
- private AtomicInteger metricsMigrated = new AtomicInteger();
-
- public MigrateData(Integer scheduleId, Bucket bucket) {
- this.scheduleId = scheduleId;
- this.bucket = bucket;
- }
-
- @Override
- public Integer apply(ResultSet resultSet) {
- List rows = resultSet.all();
- if (rows.isEmpty()) {
- log.debug("No " + bucket + " data to migrate for schedule id " + scheduleId);
- return 0;
- }
- Date time = rows.get(0).getDate(1);
- Date nextTime;
- Double max = null;
- Double min = null;
- Double avg = null;
- Long writeTime = rows.get(0).getLong(5);
- Integer ttl = rows.get(0).getInt(4);
- int batchSize = 30;
- List statements = new ArrayList(batchSize);
-
- for (Row row : rows) {
- if (writeFailed) {
- throw new RuntimeException("Migration of " + bucket + " data for schedule id " + scheduleId + " failed");
- }
- nextTime = row.getDate(1);
- if (nextTime.equals(time)) {
- int type = row.getInt(2);
- switch (type) {
- case 0 :
- max = row.getDouble(3);
- break;
- case 1:
- min = row.getDouble(3);
- break;
- default:
- avg = row.getDouble(3);
- }
- } else {
- if (isDataMissing(avg, max, min)) {
- log.debug("We only have a partial " + bucket + " metric for {scheduleId: " + scheduleId +
- ", time: " + time.getTime() + "}. It will not be migrated.");
- } else {
- statements.add(createInsertStatement(time, avg, max, min, ttl, writeTime));
- if (statements.size() == batchSize) {
- Futures.addCallback(writeBatch(statements), this);
- statements.clear();
- }
- }
-
- time = nextTime;
- max = row.getDouble(3);
- min = null;
- avg = null;
- ttl = row.getInt(4);
- writeTime = row.getLong(5);
- }
- }
-
- if (!statements.isEmpty()) {
- Futures.addCallback(writeBatch(statements), this);
- }
-
- if (writeFailed) {
- throw new RuntimeException("Migration of " + bucket + " data for schedule id " + scheduleId + " failed");
- }
-
- return metricsMigrated.get();
- }
-
- @Override
- public void onSuccess(ResultSet result) {
- metricsMigrated.incrementAndGet();
- }
-
- @Override
- public void onFailure(Throwable t) {
- writeFailed = true;
- // TODO only log a warning once
- // If we have a failure, changes are that we will get them in bunches. Since we
- // want to stop on the first failed write, it would be nice to only log the
- // first failure in order to avoid spamming the log.
- log.warn("Migration of " + bucket + " data for schedule id " + scheduleId + " failed", t);
- }
-
- private boolean isDataMissing(Double avg, Double max, Double min) {
- if (avg == null || Double.isNaN(avg)) return true;
- if (max == null || Double.isNaN(max)) return true;
- if (min == null || Double.isNaN(min)) return true;
-
- return false;
- }
-
- private ResultSetFuture writeBatch(List statements) {
- Batch batch = QueryBuilder.batch(statements.toArray(new Statement[statements.size()]));
- writePermits.acquire();
- return session.executeAsync(batch);
- }
-
- private SimpleStatement createInsertStatement(Date time, Double avg, Double max, Double min, Integer ttl,
- Long writeTime) {
- return new SimpleStatement("INSERT INTO rhq.aggregate_metrics(schedule_id, bucket, time, avg, max, min) VALUES " +
- "(" + scheduleId + ", '" + bucket + "', " + time.getTime() + ", " + avg + ", " + max + ", " +
- min + ") USING TTL " + ttl + " AND TIMESTAMP " + writeTime);
- }
- }
-
- private class MetricsWriter implements Callable, FutureCallback {
-
- private Integer scheduleId;
-
- private Bucket bucket;
-
- private ResultSet resultSet;
-
- private boolean writeFailed;
-
- private AtomicInteger metricsMigrated = new AtomicInteger();
-
- public MetricsWriter(Integer scheduleId, Bucket bucket, ResultSet resultSet) {
- this.scheduleId = scheduleId;
- this.bucket = bucket;
- this.resultSet = resultSet;
- }
-
- @Override
- public Integer call() throws Exception {
- List rows = resultSet.all();
- if (rows.isEmpty()) {
- log.debug("No " + bucket + " data to migrate for schedule id " + scheduleId);
- return 0;
- }
- Date time = rows.get(0).getDate(1);
- Date nextTime;
- Double max = null;
- Double min = null;
- Double avg = null;
- Long writeTime = rows.get(0).getLong(5);
- Integer ttl = rows.get(0).getInt(4);
- int batchSize = 30;
- List statements = new ArrayList(batchSize);
-
- for (Row row : rows) {
- if (writeFailed) {
- throw new Exception("Migration of " + bucket + " data for schedule id " + scheduleId + " failed");
- }
- nextTime = row.getDate(1);
- if (nextTime.equals(time)) {
- int type = row.getInt(2);
- switch (type) {
- case 0 :
- max = row.getDouble(3);
- break;
- case 1:
- min = row.getDouble(3);
- break;
- default:
- avg = row.getDouble(3);
- }
- } else {
- if (isDataMissing(avg, max, min)) {
- if (log.isDebugEnabled()) {
- log.debug("We only have a partial " + bucket + " metric for {scheduleId: " + scheduleId +
- ", time: " + time.getTime() + "}. It will not be migrated.");
- }
- } else {
- statements.add(createInsertStatement(time, avg, max, min, ttl, writeTime));
- if (statements.size() == batchSize) {
- Futures.addCallback(writeBatch(statements), this);
- statements.clear();
- }
- }
-
- time = nextTime;
- max = row.getDouble(3);
- min = null;
- avg = null;
- ttl = row.getInt(4);
- writeTime = row.getLong(5);
- }
- }
-
- if (!statements.isEmpty()) {
- Futures.addCallback(writeBatch(statements), this);
- }
-
- if (writeFailed) {
- throw new Exception("Migration of " + bucket + " data for schedule id " + scheduleId + " failed");
- }
-
- return metricsMigrated.get();
- }
-
- private boolean isDataMissing(Double avg, Double max, Double min) {
- if (avg == null || Double.isNaN(avg)) return true;
- if (max == null || Double.isNaN(max)) return true;
- if (min == null || Double.isNaN(min)) return true;
-
- return false;
- }
-
- @Override
- public void onSuccess(ResultSet resultSet) {
- metricsMigrated.incrementAndGet();
- }
-
- @Override
- public void onFailure(Throwable t) {
- writeFailed = true;
- // TODO only log a warning once
- // If we have a failure, changes are that we will get them in bunches. Since we
- // want to stop on the first failed write, it would be nice to only log the
- // first failure in order to avoid spamming the log.
- log.warn("Migration of " + bucket + " data for schedule id " + scheduleId + " failed", t);
- }
-
- private SimpleStatement createInsertStatement(Date time, Double avg, Double max, Double min, Integer ttl,
- Long writeTime) {
- return new SimpleStatement("INSERT INTO rhq.aggregate_metrics(schedule_id, bucket, time, avg, max, min) VALUES " +
- "(" + scheduleId + ", '" + bucket + "', " + time.getTime() + ", " + avg + ", " + max + ", " +
- min + ") USING TTL " + ttl + " AND TIMESTAMP " + writeTime);
- }
-
- private ResultSetFuture writeBatch(List statements) {
- Batch batch = QueryBuilder.batch(statements.toArray(new Statement[statements.size()]));
- writePermits.acquire();
- return session.executeAsync(batch);
- }
-
- private ResultSetFuture writeMetrics(Date time, Double avg, Double max, Double min, Integer ttl,
- Long writeTime) {
- writePermits.acquire();
- return session.executeAsync(
- "INSERT INTO rhq.aggregate_metrics(schedule_id, bucket, time, avg, max, min) VALUES " +
- "(" + scheduleId + ", '" + bucket + "', " + time.getTime() + ", " + avg + ", " + max + ", " +
- min + ") USING TTL " + ttl + " AND TIMESTAMP " + writeTime);
- }
- }
-
private class MigrationProgressLogger implements Runnable {
- private Bucket bucket;
-
- private CountDownLatch latch;
-
private boolean finished;
- public MigrationProgressLogger(Bucket bucket, CountDownLatch latch) {
- this.bucket = bucket;
- this.latch = latch;
- }
+ private boolean reportMigrationRates;
public void finished() {
finished = true;
@@ -660,9 +333,20 @@ public void finished() {
@Override
public void run() {
try {
- while (!finished && latch.getCount() > 0) {
- log.info("There are " + latch.getCount() + " remaining schedules for the " + bucket +
- " data migration");
+ while (!finished) {
+ log.info("Remaining metrics to migrate\n" +
+ Bucket.ONE_HOUR + ": " + remaining1HourMetrics + "\n" +
+ Bucket.SIX_HOUR + ": " + remaining6HourMetrics + "\n" +
+ Bucket.TWENTY_FOUR_HOUR + ": " + remaining24HourMetrics + "\n");
+ if (reportMigrationRates) {
+ log.info("Metrics migration rates:\n" +
+ "1 min rate: " + migrationsMeter.oneMinuteRate() + "\n" +
+ "5 min rate: " + migrationsMeter.fiveMinuteRate() + " \n" +
+ "15 min rate: " + migrationsMeter.fifteenMinuteRate() + "\n");
+ reportMigrationRates = false;
+ } else {
+ reportMigrationRates = true;
+ }
Thread.sleep(30000);
}
} catch (InterruptedException e) {
@@ -670,4 +354,17 @@ public void run() {
}
}
+ private void dropTables() {
+// ResultSet resultSet = session.execute("SELECT columnfamily_name FROM system.schema_columnfamilies " +
+// "WHERE keyspace_name = 'rhq'");
+// for (Row row : resultSet) {
+// String table = row.getString(0);
+// if (table.equals("one_hour_metrics") || table.equals("six_hour_metrics") ||
+// table.equals("twenty_four_hour_metrics")) {
+// log.info("Dropping table " + table);
+// session.execute("DROP table rhq." + table);
+// }
+// }
+ }
+
}
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java
new file mode 100644
index 00000000000..1e9eb5742a4
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java
@@ -0,0 +1,119 @@
+package org.rhq.cassandra.schema;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Batch;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author John Sanda
+ */
+public class MigrateData implements AsyncFunction> {
+
+ private static final Log log = LogFactory.getLog(MigrateData.class);
+
+ private static final int BATCH_SIZE = 45;
+
+ private Integer scheduleId;
+
+ private MigrateAggregateMetrics.Bucket bucket;
+
+ private RateLimiter writePermits;
+
+ private Session session;
+
+ public MigrateData(Integer scheduleId, MigrateAggregateMetrics.Bucket bucket, RateLimiter writePermits,
+ Session session) {
+ this.scheduleId = scheduleId;
+ this.bucket = bucket;
+ this.writePermits = writePermits;
+ this.session = session;
+ }
+
+ @Override
+ public ListenableFuture> apply(ResultSet resultSet) throws Exception {
+ List insertFutures = new ArrayList();
+ List rows = resultSet.all();
+ Date time = rows.get(0).getDate(1);
+ Date nextTime;
+ Double max = null;
+ Double min = null;
+ Double avg = null;
+ Long writeTime = rows.get(0).getLong(5);
+ Integer ttl = rows.get(0).getInt(4);
+ List statements = new ArrayList(BATCH_SIZE);
+
+ for (Row row : resultSet) {
+ nextTime = row.getDate(1);
+ if (nextTime.equals(time)) {
+ int type = row.getInt(2);
+ switch (type) {
+ case 0:
+ max = row.getDouble(3);
+ break;
+ case 1:
+ min = row.getDouble(3);
+ break;
+ default:
+ avg = row.getDouble(3);
+ }
+ } else {
+ if (isDataMissing(avg, max, min)) {
+ log.debug("We only have a partial " + bucket + " metric for {scheduleId: " + scheduleId +
+ ", time: " + time.getTime() + "}. It will not be migrated.");
+ } else {
+ statements.add(createInsertStatement(time, avg, max, min, ttl, writeTime));
+ if (statements.size() == BATCH_SIZE) {
+ insertFutures.add(writeBatch(statements));
+ statements.clear();
+ }
+ }
+
+ time = nextTime;
+ max = row.getDouble(3);
+ min = null;
+ avg = null;
+ ttl = row.getInt(4);
+ writeTime = row.getLong(5);
+ }
+ }
+ return Futures.allAsList(insertFutures);
+ }
+
+ private boolean isDataMissing(Double avg, Double max, Double min) {
+ if (avg == null || Double.isNaN(avg)) return true;
+ if (max == null || Double.isNaN(max)) return true;
+ if (min == null || Double.isNaN(min)) return true;
+
+ return false;
+ }
+
+ private ResultSetFuture writeBatch(List statements) {
+ Batch batch = QueryBuilder.batch(statements.toArray(new Statement[statements.size()]));
+ writePermits.acquire();
+ return session.executeAsync(batch);
+ }
+
+ private SimpleStatement createInsertStatement(Date time, Double avg, Double max, Double min, Integer ttl,
+ Long writeTime) {
+ return new SimpleStatement("INSERT INTO rhq.aggregate_metrics(schedule_id, bucket, time, avg, max, min) " +
+ "VALUES (" + scheduleId + ", '" + bucket + "', " + time.getTime() + ", " + avg + ", " + max + ", " + min +
+ ") USING TTL " + ttl + " AND TIMESTAMP " + writeTime);
+ }
+
+}
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/ReplaceIndex.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/ReplaceIndex.java
index 3c1a202b8ef..673801205a9 100644
--- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/ReplaceIndex.java
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/ReplaceIndex.java
@@ -8,11 +8,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
import org.joda.time.Days;
-import org.joda.time.Duration;
import org.joda.time.Hours;
-import org.joda.time.Period;
/**
* For RHQ 4.9 - 4.11 installations this migrates data from the metrics_index table into the new metrics_idx table. For
@@ -46,10 +43,10 @@ public void execute() {
DateRanges dateRanges = new DateRanges();
dateRanges.rawEndTime = DateTime.now().hourOfDay().roundFloorCopy();
dateRanges.rawStartTime = dateRanges.rawEndTime.minusDays(3);
- dateRanges.oneHourStartTime = getTimeSlice(dateRanges.rawStartTime, Hours.SIX.toStandardDuration());
- dateRanges.oneHourEndTime = getTimeSlice(dateRanges.rawEndTime, Hours.SIX.toStandardDuration());
- dateRanges.sixHourStartTime = getTimeSlice(dateRanges.rawStartTime, Days.ONE.toStandardDuration());
- dateRanges.sixHourEndTime = getTimeSlice(dateRanges.rawEndTime, Days.ONE.toStandardDuration());
+ dateRanges.oneHourStartTime = DateUtils.getTimeSlice(dateRanges.rawStartTime, Hours.SIX.toStandardDuration());
+ dateRanges.oneHourEndTime = DateUtils.getTimeSlice(dateRanges.rawEndTime, Hours.SIX.toStandardDuration());
+ dateRanges.sixHourStartTime = DateUtils.getTimeSlice(dateRanges.rawStartTime, Days.ONE.toStandardDuration());
+ dateRanges.sixHourEndTime = DateUtils.getTimeSlice(dateRanges.rawEndTime, Days.ONE.toStandardDuration());
if (cacheIndexExists()) {
log.info("Preparing to replace metrics_cache_index");
@@ -60,60 +57,6 @@ public void execute() {
}
}
- protected static DateTime getTimeSlice(DateTime dt, Duration duration) {
- Period p = duration.toPeriod();
-
- if (p.getYears() != 0) {
- return dt.yearOfEra().roundFloorCopy().minusYears(dt.getYearOfEra() % p.getYears());
- } else if (p.getMonths() != 0) {
- return dt.monthOfYear().roundFloorCopy().minusMonths((dt.getMonthOfYear() - 1) % p.getMonths());
- } else if (p.getWeeks() != 0) {
- return dt.weekOfWeekyear().roundFloorCopy().minusWeeks((dt.getWeekOfWeekyear() - 1) % p.getWeeks());
- } else if (p.getDays() != 0) {
- return dt.dayOfMonth().roundFloorCopy().minusDays((dt.getDayOfMonth() - 1) % p.getDays());
- } else if (p.getHours() != 0) {
- return dt.hourOfDay().roundFloorCopy().minusHours(dt.getHourOfDay() % p.getHours());
- } else if (p.getMinutes() != 0) {
- return dt.minuteOfHour().roundFloorCopy().minusMinutes(dt.getMinuteOfHour() % p.getMinutes());
- } else if (p.getSeconds() != 0) {
- return dt.secondOfMinute().roundFloorCopy().minusSeconds(dt.getSecondOfMinute() % p.getSeconds());
- }
- return dt.millisOfSecond().roundCeilingCopy().minusMillis(dt.getMillisOfSecond() % p.getMillis());
- }
-
- protected static DateTime getUTCTimeSlice(DateTime dateTime, Duration duration) {
- return getTimeSlice(new DateTime(dateTime.getMillis(), DateTimeZone.UTC), duration);
- }
-
- protected static DateTime plusDSTAware(DateTime dateTime, Duration duration) {
- //(BZ 1161806) Added code to adjust to the shifts in time due to
- // changes from DST to non-DST and the reverse.
- //
- // 1) When switching from DST to non-DST, the time after the
- // duration increment needs to be adjusted by a positive
- // one hour
- //
- // 2) When switching from non-DST to DST, the time after the
- // duration increment needs to be adjusted by a negative
- // one hour
- //
- // Note: this does not work if the duration is exactly one
- // hour because it will create an infinite loop when switching
- // from non-DST to DST times.
-
- if (duration.toPeriod().getHours() <= 1) {
- dateTime = dateTime.plus(duration);
- } else {
- DateTimeZone zone = dateTime.getZone();
- int beforeOffset = zone.getOffset(dateTime.getMillis());
- dateTime = dateTime.plus(duration);
- int afterOffset = zone.getOffset(dateTime.getMillis());
- dateTime = dateTime.plus(beforeOffset - afterOffset);
- }
-
- return dateTime;
- }
-
private boolean cacheIndexExists() {
ResultSet resultSet = session.execute("SELECT columnfamily_name FROM system.schema_columnfamilies " +
"WHERE keyspace_name = 'rhq' AND columnfamily_name = 'metrics_cache_index'");
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TaskTracker.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TaskTracker.java
new file mode 100644
index 00000000000..31460b9017f
--- /dev/null
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TaskTracker.java
@@ -0,0 +1,124 @@
+package org.rhq.cassandra.schema;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * @author John Sanda
+ */
+class TaskTracker {
+
+ private volatile int remainingTasks;
+
+ private volatile boolean schedulingFinished;
+
+ private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private CountDownLatch allTasksFinished = new CountDownLatch(1);
+
+ private volatile boolean aborted;
+
+ private String errorMessage;
+
+ /**
+ * Increases the count of remaining tasks.
+ */
+ public void addTask() {
+ try {
+ lock.writeLock().lock();
+ remainingTasks++;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * This method is intended primarily for debugging purposes to log the progress. While other methods in this class
+ * obtain a read or write lock, this method intentionally does not. There is no need to impose the locking overhead
+ * since this method only reads a single variable that is a volatile.
+ *
+ * @return The number of remaining or outstanding tasks to be completed
+ */
+ public int getRemainingTasks() {
+ return remainingTasks;
+ }
+
+ /**
+ * Should be called by the producer when it has finished scheduling tasks. Moreover the producer must invoke this
+ * method before it invokes {@link #waitForTasksToFinish()}. Failure to do so will cause the producer to block
+ * indefinitely.
+ */
+ public void finishedSchedulingTasks() {
+ try {
+ lock.writeLock().lock();
+ schedulingFinished = true;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Should be invoked by a consumer when it completes a task.
+ */
+ public void finishedTask() {
+ try {
+ lock.writeLock().lock();
+ remainingTasks--;
+ if (schedulingFinished && remainingTasks == 0) {
+ allTasksFinished.countDown();
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Should be invoked by the producer only after it has invoked {@link #finishedSchedulingTasks()}.
+ * If this method gets invoked first, the producer will block indefinitely. This method will block until all tasks
+ * have completed. If all tasks have already completed, this method returns immediately.
+ *
+ * @throws InterruptedException If the producer thread is interrupted while waiting
+ * @throws AbortedException If task processing has been abort which is accomplished by calling
+ * {@link #abort(String)}
+ */
+ public void waitForTasksToFinish() throws InterruptedException, AbortedException {
+ try {
+ lock.readLock().lock();
+ if (aborted) {
+ throw new AbortedException(errorMessage);
+ }
+ if (remainingTasks == 0) {
+ return;
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ allTasksFinished.await();
+ try {
+ lock.readLock().lock();
+ if (aborted) {
+ throw new AbortedException(errorMessage);
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Should be invoked by a consumer to abort processing of any future tasks.
+ *
+ * @param msg An error message that will be included in the {@link AbortedException} thrown by
+ * {@link #waitForTasksToFinish()}
+ */
+ public void abort(String msg) {
+ try {
+ lock.writeLock().lock();
+ errorMessage = msg;
+ aborted = true;
+ allTasksFinished.countDown();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+}