Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
[BZ 185375] simplify queries and TTL calculations
Browse files Browse the repository at this point in the history
THere is no need to include the schedule id, write time, and ttl in the query
results. We already know the schedule id, and we can compute the ttl.
  • Loading branch information
John Sanda authored and Simeon Pinder committed Mar 6, 2015
1 parent 5587ae7 commit 08321b1
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 45 deletions.
Expand Up @@ -340,34 +340,38 @@ private void migrate1HourData(Set<Integer> scheduleIds) throws IOException {
DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now());
DateTime startTime = endTime.minus(Days.days(14));
PreparedStatement query = session.prepare(
"SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.one_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
"SELECT time, type, value FROM rhq.one_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
PreparedStatement delete = session.prepare("DELETE FROM rhq.one_hour_metrics WHERE schedule_id = ?");
migrateData(scheduleIds, query, delete, Bucket.ONE_HOUR, remaining1HourMetrics, migrated1HourMetrics);
migrateData(scheduleIds, query, delete, Bucket.ONE_HOUR, remaining1HourMetrics, migrated1HourMetrics,
Days.days(14));
}

private void migrate6HourData(Set<Integer> scheduleIds) throws IOException {
DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now());
DateTime startTime = endTime.minus(Days.days(14));
DateTime startTime = endTime.minus(Days.days(31));
PreparedStatement query = session.prepare(
"SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.six_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
"SELECT time, type, value FROM rhq.six_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
PreparedStatement delete = session.prepare("DELETE FROM rhq.six_hour_metrics WHERE schedule_id = ?");
migrateData(scheduleIds, query, delete, Bucket.SIX_HOUR, remaining6HourMetrics, migrated6HourMetrics);
migrateData(scheduleIds, query, delete, Bucket.SIX_HOUR, remaining6HourMetrics, migrated6HourMetrics,
Days.days(31));
}

private void migrate24HourData(Set<Integer> scheduleIds) throws IOException {
DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now());
DateTime startTime = endTime.minus(Days.days(14));
DateTime startTime = endTime.minus(Days.days(365));
PreparedStatement query = session.prepare(
"SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.twenty_four_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
"SELECT time, type, value FROM rhq.twenty_four_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
PreparedStatement delete = session.prepare("DELETE FROM rhq.twenty_four_hour_metrics WHERE schedule_id = ?");
migrateData(scheduleIds, query, delete, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics, migrated24HourMetrics);
migrateData(scheduleIds, query, delete, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics, migrated24HourMetrics,
Days.days(365));
}

private void migrateData(Set<Integer> scheduleIds, PreparedStatement query, final PreparedStatement delete,
Bucket bucket, final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics) throws IOException {
Bucket bucket, final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics, Days ttl)
throws IOException {

Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Scheduling data migration tasks for " + bucket + " data");
Expand All @@ -377,7 +381,7 @@ private void migrateData(Set<Integer> scheduleIds, PreparedStatement query, fina
for (final Integer scheduleId : scheduleIds) {
if (!migratedScheduleIds.contains(scheduleId)) {
migrations.addTask();
migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId);
migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId, ttl);
}
}

Expand All @@ -388,11 +392,11 @@ private void migrateData(Set<Integer> scheduleIds, PreparedStatement query, fina

private void migrateData(PreparedStatement query, final PreparedStatement delete, final Bucket bucket,
AtomicInteger remainingMetrics, AtomicInteger migratedMetrics, MigrationLog migrationLog,
final Integer scheduleId) {
final Integer scheduleId, Days ttl) {
readPermitsRef.get().acquire();
ResultSetFuture queryFuture = session.executeAsync(query.bind(scheduleId));
ListenableFuture<List<ResultSet>> migrationFuture = Futures.transform(queryFuture,
new MigrateData(scheduleId, bucket, writePermitsRef.get(), session), threadPool);
new MigrateData(scheduleId, bucket, writePermitsRef.get(), session, ttl.toStandardSeconds()), threadPool);
ListenableFuture<ResultSet> deleteFuture = Futures.transform(migrationFuture,
new AsyncFunction<List<ResultSet>, ResultSet>() {
@Override
Expand All @@ -402,12 +406,12 @@ public ListenableFuture<ResultSet> apply(List<ResultSet> resultSets) throws Exce
}
}, threadPool);
Futures.addCallback(deleteFuture, migrationFinished(query, delete, scheduleId, bucket, migrationLog,
remainingMetrics, migratedMetrics), threadPool);
remainingMetrics, migratedMetrics, ttl), threadPool);
}

private FutureCallback<ResultSet> migrationFinished(final PreparedStatement query, final PreparedStatement delete,
final Integer scheduleId, final Bucket bucket, final MigrationLog migrationLog, final AtomicInteger
remainingMetrics, final AtomicInteger migratedMetrics) {
remainingMetrics, final AtomicInteger migratedMetrics, final Days ttl) {

return new FutureCallback<ResultSet>() {
@Override
Expand Down Expand Up @@ -438,7 +442,7 @@ public void onFailure(Throwable t) {
ThrowableUtil.getRootMessage(t) + ". Migration will be rescheduled");
}
rateMonitor.requestFailed();
migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId);
migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId, ttl);
}
};
}
Expand Down
Expand Up @@ -19,6 +19,8 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.joda.time.Seconds;

/**
* @author John Sanda
Expand All @@ -37,12 +39,15 @@ public class MigrateData implements AsyncFunction<ResultSet, List<ResultSet>> {

private Session session;

private Seconds ttl;

public MigrateData(Integer scheduleId, MigrateAggregateMetrics.Bucket bucket, RateLimiter writePermits,
Session session) {
Session session, Seconds ttl) {
this.scheduleId = scheduleId;
this.bucket = bucket;
this.writePermits = writePermits;
this.session = session;
this.ttl = ttl;
}

@Override
Expand All @@ -53,47 +58,49 @@ public ListenableFuture<List<ResultSet>> apply(ResultSet resultSet) throws Excep
return Futures.allAsList(insertFutures);
}
List<Row> rows = resultSet.all();
Date time = rows.get(0).getDate(1);
Date time = rows.get(0).getDate(0);
Date nextTime;
Double max = null;
Double min = null;
Double avg = null;
Long writeTime = rows.get(0).getLong(5);
Integer ttl = rows.get(0).getInt(4);
Seconds elapsedSeconds = Seconds.secondsBetween(DateTime.now(), new DateTime(time));
List<Statement> statements = new ArrayList<Statement>(BATCH_SIZE);

for (Row row : rows) {
nextTime = row.getDate(1);
nextTime = row.getDate(0);
if (nextTime.equals(time)) {
int type = row.getInt(2);
int type = row.getInt(1);
switch (type) {
case 0:
max = row.getDouble(3);
max = row.getDouble(2);
break;
case 1:
min = row.getDouble(3);
min = row.getDouble(2);
break;
default:
avg = row.getDouble(3);
avg = row.getDouble(2);
}
} else {
if (isDataMissing(avg, max, min)) {
log.debug("We only have a partial " + bucket + " metric for {scheduleId: " + scheduleId +
", time: " + time.getTime() + "}. It will not be migrated.");
} else {
statements.add(createInsertStatement(time, avg, max, min, ttl, writeTime));
if (statements.size() == BATCH_SIZE) {
insertFutures.add(writeBatch(statements));
statements.clear();
if (elapsedSeconds.isLessThan(ttl)) {
if (isDataMissing(avg, max, min)) {
if (log.isDebugEnabled()) {
log.debug("We only have a partial " + bucket + " metric for {scheduleId: " +
scheduleId + ", time: " + time.getTime() + "}. It will not be migrated.");
}
} else {
int newTTL = ttl.getSeconds() - elapsedSeconds.getSeconds();
statements.add(createInsertStatement(time, avg, max, min, newTTL));
if (statements.size() == BATCH_SIZE) {
insertFutures.add(writeBatch(statements));
statements.clear();
}
}
}

time = nextTime;
max = row.getDouble(3);
min = null;
avg = null;
ttl = row.getInt(4);
writeTime = row.getLong(5);
time = nextTime;
max = row.getDouble(2);
min = null;
avg = null;
}
}
}
if (!statements.isEmpty()) {
Expand All @@ -120,11 +127,10 @@ private ResultSetFuture writeBatch(List<Statement> statements) {
return session.executeAsync(batch);
}

private SimpleStatement createInsertStatement(Date time, Double avg, Double max, Double min, Integer ttl,
Long writeTime) {
private SimpleStatement createInsertStatement(Date time, Double avg, Double max, Double min, int newTTL) {
return new SimpleStatement("INSERT INTO rhq.aggregate_metrics(schedule_id, bucket, time, avg, max, min) " +
"VALUES (" + scheduleId + ", '" + bucket + "', " + time.getTime() + ", " + avg + ", " + max + ", " + min +
") USING TTL " + ttl + " AND TIMESTAMP " + writeTime);
") USING TTL " + newTTL);
}

}

0 comments on commit 08321b1

Please sign in to comment.