diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java index 32e5a033f87..b029df5b941 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java @@ -340,34 +340,38 @@ private void migrate1HourData(Set scheduleIds) throws IOException { DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now()); DateTime startTime = endTime.minus(Days.days(14)); PreparedStatement query = session.prepare( - "SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.one_hour_metrics " + - "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); + "SELECT time, type, value FROM rhq.one_hour_metrics " + + "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); PreparedStatement delete = session.prepare("DELETE FROM rhq.one_hour_metrics WHERE schedule_id = ?"); - migrateData(scheduleIds, query, delete, Bucket.ONE_HOUR, remaining1HourMetrics, migrated1HourMetrics); + migrateData(scheduleIds, query, delete, Bucket.ONE_HOUR, remaining1HourMetrics, migrated1HourMetrics, + Days.days(14)); } private void migrate6HourData(Set scheduleIds) throws IOException { DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now()); - DateTime startTime = endTime.minus(Days.days(14)); + DateTime startTime = endTime.minus(Days.days(31)); PreparedStatement query = session.prepare( - "SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.six_hour_metrics " + - "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); + "SELECT time, type, value FROM rhq.six_hour_metrics " + + "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); PreparedStatement delete = session.prepare("DELETE FROM rhq.six_hour_metrics WHERE schedule_id = ?"); - migrateData(scheduleIds, query, delete, Bucket.SIX_HOUR, remaining6HourMetrics, migrated6HourMetrics); + migrateData(scheduleIds, query, delete, Bucket.SIX_HOUR, remaining6HourMetrics, migrated6HourMetrics, + Days.days(31)); } private void migrate24HourData(Set scheduleIds) throws IOException { DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now()); - DateTime startTime = endTime.minus(Days.days(14)); + DateTime startTime = endTime.minus(Days.days(365)); PreparedStatement query = session.prepare( - "SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.twenty_four_hour_metrics " + - "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); + "SELECT time, type, value FROM rhq.twenty_four_hour_metrics " + + "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); PreparedStatement delete = session.prepare("DELETE FROM rhq.twenty_four_hour_metrics WHERE schedule_id = ?"); - migrateData(scheduleIds, query, delete, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics, migrated24HourMetrics); + migrateData(scheduleIds, query, delete, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics, migrated24HourMetrics, + Days.days(365)); } private void migrateData(Set scheduleIds, PreparedStatement query, final PreparedStatement delete, - Bucket bucket, final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics) throws IOException { + Bucket bucket, final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics, Days ttl) + throws IOException { Stopwatch stopwatch = Stopwatch.createStarted(); log.info("Scheduling data migration tasks for " + bucket + " data"); @@ -377,7 +381,7 @@ private void migrateData(Set scheduleIds, PreparedStatement query, fina for (final Integer scheduleId : scheduleIds) { if (!migratedScheduleIds.contains(scheduleId)) { migrations.addTask(); - migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId); + migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId, ttl); } } @@ -388,11 +392,11 @@ private void migrateData(Set scheduleIds, PreparedStatement query, fina private void migrateData(PreparedStatement query, final PreparedStatement delete, final Bucket bucket, AtomicInteger remainingMetrics, AtomicInteger migratedMetrics, MigrationLog migrationLog, - final Integer scheduleId) { + final Integer scheduleId, Days ttl) { readPermitsRef.get().acquire(); ResultSetFuture queryFuture = session.executeAsync(query.bind(scheduleId)); ListenableFuture> migrationFuture = Futures.transform(queryFuture, - new MigrateData(scheduleId, bucket, writePermitsRef.get(), session), threadPool); + new MigrateData(scheduleId, bucket, writePermitsRef.get(), session, ttl.toStandardSeconds()), threadPool); ListenableFuture deleteFuture = Futures.transform(migrationFuture, new AsyncFunction, ResultSet>() { @Override @@ -402,12 +406,12 @@ public ListenableFuture apply(List resultSets) throws Exce } }, threadPool); Futures.addCallback(deleteFuture, migrationFinished(query, delete, scheduleId, bucket, migrationLog, - remainingMetrics, migratedMetrics), threadPool); + remainingMetrics, migratedMetrics, ttl), threadPool); } private FutureCallback migrationFinished(final PreparedStatement query, final PreparedStatement delete, final Integer scheduleId, final Bucket bucket, final MigrationLog migrationLog, final AtomicInteger - remainingMetrics, final AtomicInteger migratedMetrics) { + remainingMetrics, final AtomicInteger migratedMetrics, final Days ttl) { return new FutureCallback() { @Override @@ -438,7 +442,7 @@ public void onFailure(Throwable t) { ThrowableUtil.getRootMessage(t) + ". Migration will be rescheduled"); } rateMonitor.requestFailed(); - migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId); + migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId, ttl); } }; } diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java index da2c541a1cd..b868471ac31 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java @@ -19,6 +19,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.joda.time.DateTime; +import org.joda.time.Seconds; /** * @author John Sanda @@ -37,12 +39,15 @@ public class MigrateData implements AsyncFunction> { private Session session; + private Seconds ttl; + public MigrateData(Integer scheduleId, MigrateAggregateMetrics.Bucket bucket, RateLimiter writePermits, - Session session) { + Session session, Seconds ttl) { this.scheduleId = scheduleId; this.bucket = bucket; this.writePermits = writePermits; this.session = session; + this.ttl = ttl; } @Override @@ -53,47 +58,49 @@ public ListenableFuture> apply(ResultSet resultSet) throws Excep return Futures.allAsList(insertFutures); } List rows = resultSet.all(); - Date time = rows.get(0).getDate(1); + Date time = rows.get(0).getDate(0); Date nextTime; Double max = null; Double min = null; Double avg = null; - Long writeTime = rows.get(0).getLong(5); - Integer ttl = rows.get(0).getInt(4); + Seconds elapsedSeconds = Seconds.secondsBetween(DateTime.now(), new DateTime(time)); List statements = new ArrayList(BATCH_SIZE); for (Row row : rows) { - nextTime = row.getDate(1); + nextTime = row.getDate(0); if (nextTime.equals(time)) { - int type = row.getInt(2); + int type = row.getInt(1); switch (type) { case 0: - max = row.getDouble(3); + max = row.getDouble(2); break; case 1: - min = row.getDouble(3); + min = row.getDouble(2); break; default: - avg = row.getDouble(3); + avg = row.getDouble(2); } } else { - if (isDataMissing(avg, max, min)) { - log.debug("We only have a partial " + bucket + " metric for {scheduleId: " + scheduleId + - ", time: " + time.getTime() + "}. It will not be migrated."); - } else { - statements.add(createInsertStatement(time, avg, max, min, ttl, writeTime)); - if (statements.size() == BATCH_SIZE) { - insertFutures.add(writeBatch(statements)); - statements.clear(); + if (elapsedSeconds.isLessThan(ttl)) { + if (isDataMissing(avg, max, min)) { + if (log.isDebugEnabled()) { + log.debug("We only have a partial " + bucket + " metric for {scheduleId: " + + scheduleId + ", time: " + time.getTime() + "}. It will not be migrated."); + } + } else { + int newTTL = ttl.getSeconds() - elapsedSeconds.getSeconds(); + statements.add(createInsertStatement(time, avg, max, min, newTTL)); + if (statements.size() == BATCH_SIZE) { + insertFutures.add(writeBatch(statements)); + statements.clear(); + } } - } - time = nextTime; - max = row.getDouble(3); - min = null; - avg = null; - ttl = row.getInt(4); - writeTime = row.getLong(5); + time = nextTime; + max = row.getDouble(2); + min = null; + avg = null; + } } } if (!statements.isEmpty()) { @@ -120,11 +127,10 @@ private ResultSetFuture writeBatch(List statements) { return session.executeAsync(batch); } - private SimpleStatement createInsertStatement(Date time, Double avg, Double max, Double min, Integer ttl, - Long writeTime) { + private SimpleStatement createInsertStatement(Date time, Double avg, Double max, Double min, int newTTL) { return new SimpleStatement("INSERT INTO rhq.aggregate_metrics(schedule_id, bucket, time, avg, max, min) " + "VALUES (" + scheduleId + ", '" + bucket + "', " + time.getTime() + ", " + avg + ", " + max + ", " + min + - ") USING TTL " + ttl + " AND TIMESTAMP " + writeTime); + ") USING TTL " + newTTL); } }