From 08321b1e4a3d3ac96b42964b77f29d1f8523fde3 Mon Sep 17 00:00:00 2001 From: John Sanda Date: Mon, 2 Mar 2015 11:58:12 -0500 Subject: [PATCH] [BZ 185375] simplify queries and TTL calculations THere is no need to include the schedule id, write time, and ttl in the query results. We already know the schedule id, and we can compute the ttl. --- .../schema/MigrateAggregateMetrics.java | 40 +++++++------ .../org/rhq/cassandra/schema/MigrateData.java | 60 ++++++++++--------- 2 files changed, 55 insertions(+), 45 deletions(-) diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java index 32e5a033f87..b029df5b941 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java @@ -340,34 +340,38 @@ private void migrate1HourData(Set scheduleIds) throws IOException { DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now()); DateTime startTime = endTime.minus(Days.days(14)); PreparedStatement query = session.prepare( - "SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.one_hour_metrics " + - "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); + "SELECT time, type, value FROM rhq.one_hour_metrics " + + "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); PreparedStatement delete = session.prepare("DELETE FROM rhq.one_hour_metrics WHERE schedule_id = ?"); - migrateData(scheduleIds, query, delete, Bucket.ONE_HOUR, remaining1HourMetrics, migrated1HourMetrics); + migrateData(scheduleIds, query, delete, Bucket.ONE_HOUR, remaining1HourMetrics, migrated1HourMetrics, + Days.days(14)); } private void migrate6HourData(Set scheduleIds) throws IOException { DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now()); - DateTime startTime = endTime.minus(Days.days(14)); + DateTime startTime = endTime.minus(Days.days(31)); PreparedStatement query = session.prepare( - "SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.six_hour_metrics " + - "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); + "SELECT time, type, value FROM rhq.six_hour_metrics " + + "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); PreparedStatement delete = session.prepare("DELETE FROM rhq.six_hour_metrics WHERE schedule_id = ?"); - migrateData(scheduleIds, query, delete, Bucket.SIX_HOUR, remaining6HourMetrics, migrated6HourMetrics); + migrateData(scheduleIds, query, delete, Bucket.SIX_HOUR, remaining6HourMetrics, migrated6HourMetrics, + Days.days(31)); } private void migrate24HourData(Set scheduleIds) throws IOException { DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now()); - DateTime startTime = endTime.minus(Days.days(14)); + DateTime startTime = endTime.minus(Days.days(365)); PreparedStatement query = session.prepare( - "SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.twenty_four_hour_metrics " + - "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); + "SELECT time, type, value FROM rhq.twenty_four_hour_metrics " + + "WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis()); PreparedStatement delete = session.prepare("DELETE FROM rhq.twenty_four_hour_metrics WHERE schedule_id = ?"); - migrateData(scheduleIds, query, delete, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics, migrated24HourMetrics); + migrateData(scheduleIds, query, delete, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics, migrated24HourMetrics, + Days.days(365)); } private void migrateData(Set scheduleIds, PreparedStatement query, final PreparedStatement delete, - Bucket bucket, final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics) throws IOException { + Bucket bucket, final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics, Days ttl) + throws IOException { Stopwatch stopwatch = Stopwatch.createStarted(); log.info("Scheduling data migration tasks for " + bucket + " data"); @@ -377,7 +381,7 @@ private void migrateData(Set scheduleIds, PreparedStatement query, fina for (final Integer scheduleId : scheduleIds) { if (!migratedScheduleIds.contains(scheduleId)) { migrations.addTask(); - migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId); + migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId, ttl); } } @@ -388,11 +392,11 @@ private void migrateData(Set scheduleIds, PreparedStatement query, fina private void migrateData(PreparedStatement query, final PreparedStatement delete, final Bucket bucket, AtomicInteger remainingMetrics, AtomicInteger migratedMetrics, MigrationLog migrationLog, - final Integer scheduleId) { + final Integer scheduleId, Days ttl) { readPermitsRef.get().acquire(); ResultSetFuture queryFuture = session.executeAsync(query.bind(scheduleId)); ListenableFuture> migrationFuture = Futures.transform(queryFuture, - new MigrateData(scheduleId, bucket, writePermitsRef.get(), session), threadPool); + new MigrateData(scheduleId, bucket, writePermitsRef.get(), session, ttl.toStandardSeconds()), threadPool); ListenableFuture deleteFuture = Futures.transform(migrationFuture, new AsyncFunction, ResultSet>() { @Override @@ -402,12 +406,12 @@ public ListenableFuture apply(List resultSets) throws Exce } }, threadPool); Futures.addCallback(deleteFuture, migrationFinished(query, delete, scheduleId, bucket, migrationLog, - remainingMetrics, migratedMetrics), threadPool); + remainingMetrics, migratedMetrics, ttl), threadPool); } private FutureCallback migrationFinished(final PreparedStatement query, final PreparedStatement delete, final Integer scheduleId, final Bucket bucket, final MigrationLog migrationLog, final AtomicInteger - remainingMetrics, final AtomicInteger migratedMetrics) { + remainingMetrics, final AtomicInteger migratedMetrics, final Days ttl) { return new FutureCallback() { @Override @@ -438,7 +442,7 @@ public void onFailure(Throwable t) { ThrowableUtil.getRootMessage(t) + ". Migration will be rescheduled"); } rateMonitor.requestFailed(); - migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId); + migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId, ttl); } }; } diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java index da2c541a1cd..b868471ac31 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateData.java @@ -19,6 +19,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.joda.time.DateTime; +import org.joda.time.Seconds; /** * @author John Sanda @@ -37,12 +39,15 @@ public class MigrateData implements AsyncFunction> { private Session session; + private Seconds ttl; + public MigrateData(Integer scheduleId, MigrateAggregateMetrics.Bucket bucket, RateLimiter writePermits, - Session session) { + Session session, Seconds ttl) { this.scheduleId = scheduleId; this.bucket = bucket; this.writePermits = writePermits; this.session = session; + this.ttl = ttl; } @Override @@ -53,47 +58,49 @@ public ListenableFuture> apply(ResultSet resultSet) throws Excep return Futures.allAsList(insertFutures); } List rows = resultSet.all(); - Date time = rows.get(0).getDate(1); + Date time = rows.get(0).getDate(0); Date nextTime; Double max = null; Double min = null; Double avg = null; - Long writeTime = rows.get(0).getLong(5); - Integer ttl = rows.get(0).getInt(4); + Seconds elapsedSeconds = Seconds.secondsBetween(DateTime.now(), new DateTime(time)); List statements = new ArrayList(BATCH_SIZE); for (Row row : rows) { - nextTime = row.getDate(1); + nextTime = row.getDate(0); if (nextTime.equals(time)) { - int type = row.getInt(2); + int type = row.getInt(1); switch (type) { case 0: - max = row.getDouble(3); + max = row.getDouble(2); break; case 1: - min = row.getDouble(3); + min = row.getDouble(2); break; default: - avg = row.getDouble(3); + avg = row.getDouble(2); } } else { - if (isDataMissing(avg, max, min)) { - log.debug("We only have a partial " + bucket + " metric for {scheduleId: " + scheduleId + - ", time: " + time.getTime() + "}. It will not be migrated."); - } else { - statements.add(createInsertStatement(time, avg, max, min, ttl, writeTime)); - if (statements.size() == BATCH_SIZE) { - insertFutures.add(writeBatch(statements)); - statements.clear(); + if (elapsedSeconds.isLessThan(ttl)) { + if (isDataMissing(avg, max, min)) { + if (log.isDebugEnabled()) { + log.debug("We only have a partial " + bucket + " metric for {scheduleId: " + + scheduleId + ", time: " + time.getTime() + "}. It will not be migrated."); + } + } else { + int newTTL = ttl.getSeconds() - elapsedSeconds.getSeconds(); + statements.add(createInsertStatement(time, avg, max, min, newTTL)); + if (statements.size() == BATCH_SIZE) { + insertFutures.add(writeBatch(statements)); + statements.clear(); + } } - } - time = nextTime; - max = row.getDouble(3); - min = null; - avg = null; - ttl = row.getInt(4); - writeTime = row.getLong(5); + time = nextTime; + max = row.getDouble(2); + min = null; + avg = null; + } } } if (!statements.isEmpty()) { @@ -120,11 +127,10 @@ private ResultSetFuture writeBatch(List statements) { return session.executeAsync(batch); } - private SimpleStatement createInsertStatement(Date time, Double avg, Double max, Double min, Integer ttl, - Long writeTime) { + private SimpleStatement createInsertStatement(Date time, Double avg, Double max, Double min, int newTTL) { return new SimpleStatement("INSERT INTO rhq.aggregate_metrics(schedule_id, bucket, time, avg, max, min) " + "VALUES (" + scheduleId + ", '" + bucket + "', " + time.getTime() + ", " + avg + ", " + max + ", " + min + - ") USING TTL " + ttl + " AND TIMESTAMP " + writeTime); + ") USING TTL " + newTTL); } }