Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
[BZ 185375] retry data migrations on failures
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Mar 1, 2015
1 parent c5ae07f commit f6b2fc9
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 85 deletions.
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.io.LineReader;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -97,7 +98,9 @@ public String toString() {
}
}

public static final int DEFAULT_WARM_UP = 10;
public static final int DEFAULT_WARM_UP = 20;

private static final double RATE_INCREASE_PER_NODE = 0.3;

private Session session;

Expand Down Expand Up @@ -164,13 +167,6 @@ public void execute() {
log.info("The relational database connection factory is not set. No data migration necessary");
return;
}
writePermitsRef.set(RateLimiter.create(getWriteLimit() * getNumberOfUpNodes(), DEFAULT_WARM_UP,
TimeUnit.SECONDS));
readPermitsRef.set(RateLimiter.create(getReadLimit() * getNumberOfUpNodes(), DEFAULT_WARM_UP,
TimeUnit.SECONDS));

log.info("The request limits are " + writePermitsRef.get().getRate() + " writes/sec and " +
readPermitsRef.get().getRate() + " reads/sec");

AstyanaxContext<Keyspace> context = createContext();
context.start();
Expand All @@ -184,6 +180,19 @@ public void execute() {
"twenty_four_hour_metrics", IntegerSerializer.get(), CompositeSerializer.get()),
Bucket.TWENTY_FOUR_HOUR);

Stopwatch contextStopWatch = Stopwatch.createStarted();
context.shutdown();
contextStopWatch.stop();
log.info("Shut down astyanax in " + contextStopWatch.elapsed(TimeUnit.MILLISECONDS) + " ms");

writePermitsRef.set(RateLimiter.create(getWriteLimit(getNumberOfUpNodes()), DEFAULT_WARM_UP,
TimeUnit.SECONDS));
readPermitsRef.set(RateLimiter.create(getReadLimit(getNumberOfUpNodes()), DEFAULT_WARM_UP,
TimeUnit.SECONDS));

log.info("The request limits are " + writePermitsRef.get().getRate() + " writes/sec and " +
readPermitsRef.get().getRate() + " reads/sec");

remaining1HourMetrics.set(scheduleIdsWith1HourData.size());
remaining6HourMetrics.set(scheduleIdsWith6HourData.size());
remaining24HourMetrics.set(scheduleIdsWith24HourData.size());
Expand Down Expand Up @@ -239,12 +248,16 @@ private AstyanaxContext<Keyspace> createContext() {
.buildKeyspace(ThriftFamilyFactory.getInstance());
}

private int getWriteLimit() {
return Integer.parseInt(System.getProperty("rhq.storage.request.limit", "20000"));
private double getWriteLimit(int numNodes) {
int baseLimit = Integer.parseInt(System.getProperty("rhq.storage.request.write-limit", "20000"));
double increase = baseLimit * RATE_INCREASE_PER_NODE;
return baseLimit + (increase * (numNodes - 1));
}

private int getReadLimit() {
return Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "200"));
private double getReadLimit(int numNodes) {
int baseLimit = Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "200"));
double increase = baseLimit * RATE_INCREASE_PER_NODE;
return baseLimit + (increase * (numNodes - 1));
}

private int getNumberOfUpNodes() {
Expand Down Expand Up @@ -325,7 +338,8 @@ private void migrate1HourData(Set<Integer> scheduleIds) throws IOException {
PreparedStatement query = session.prepare(
"SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.one_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
migrateData(scheduleIds, query, Bucket.ONE_HOUR, remaining1HourMetrics, migrated1HourMetrics);
PreparedStatement delete = session.prepare("DELETE FROM rhq.one_hour_metrics WHERE schedule_id = ?");
migrateData(scheduleIds, query, delete, Bucket.ONE_HOUR, remaining1HourMetrics, migrated1HourMetrics);
}

private void migrate6HourData(Set<Integer> scheduleIds) throws IOException {
Expand All @@ -334,7 +348,8 @@ private void migrate6HourData(Set<Integer> scheduleIds) throws IOException {
PreparedStatement query = session.prepare(
"SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.six_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
migrateData(scheduleIds, query, Bucket.SIX_HOUR, remaining6HourMetrics, migrated6HourMetrics);
PreparedStatement delete = session.prepare("DELETE FROM rhq.six_hour_metrics WHERE schedule_id = ?");
migrateData(scheduleIds, query, delete, Bucket.SIX_HOUR, remaining6HourMetrics, migrated6HourMetrics);
}

private void migrate24HourData(Set<Integer> scheduleIds) throws IOException {
Expand All @@ -343,11 +358,12 @@ private void migrate24HourData(Set<Integer> scheduleIds) throws IOException {
PreparedStatement query = session.prepare(
"SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.twenty_four_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());
migrateData(scheduleIds, query, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics, migrated24HourMetrics);
PreparedStatement delete = session.prepare("DELETE FROM rhq.twenty_four_hour_metrics WHERE schedule_id = ?");
migrateData(scheduleIds, query, delete, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics, migrated24HourMetrics);
}

private void migrateData(final Set<Integer> scheduleIds, final PreparedStatement query, final Bucket bucket,
final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics) throws IOException {
private void migrateData(Set<Integer> scheduleIds, PreparedStatement query, final PreparedStatement delete,
Bucket bucket, final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics) throws IOException {

Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Scheduling data migration tasks for " + bucket + " data");
Expand All @@ -357,12 +373,7 @@ private void migrateData(final Set<Integer> scheduleIds, final PreparedStatement
for (final Integer scheduleId : scheduleIds) {
if (!migratedScheduleIds.contains(scheduleId)) {
migrations.addTask();
readPermitsRef.get().acquire();
ResultSetFuture queryFuture = session.executeAsync(query.bind(scheduleId));
ListenableFuture<List<ResultSet>> migrationFuture = Futures.transform(queryFuture,
new MigrateData(scheduleId, bucket, writePermitsRef.get(), session), threadPool);
Futures.addCallback(migrationFuture, migrationFinished(scheduleId, bucket, migrationLog,
remainingMetrics, migratedMetrics), threadPool);
migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId);
}
}

Expand All @@ -371,11 +382,32 @@ private void migrateData(final Set<Integer> scheduleIds, final PreparedStatement
stopwatch.elapsed(TimeUnit.SECONDS) + " sec");
}

private FutureCallback<List<ResultSet>> migrationFinished(final Integer scheduleId, final Bucket bucket,
final MigrationLog migrationLog, final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics) {
return new FutureCallback<List<ResultSet>>() {
private void migrateData(PreparedStatement query, final PreparedStatement delete, Bucket bucket,
AtomicInteger remainingMetrics, AtomicInteger migratedMetrics, MigrationLog migrationLog,
final Integer scheduleId) {
readPermitsRef.get().acquire();
ResultSetFuture queryFuture = session.executeAsync(query.bind(scheduleId));
ListenableFuture<List<ResultSet>> migrationFuture = Futures.transform(queryFuture,
new MigrateData(scheduleId, bucket, writePermitsRef.get(), session), threadPool);
ListenableFuture<ResultSet> deleteFuture = Futures.transform(migrationFuture,
new AsyncFunction<List<ResultSet>, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(List<ResultSet> resultSets) throws Exception {
writePermitsRef.get().acquire();
return session.executeAsync(delete.bind(scheduleId));
}
}, threadPool);
Futures.addCallback(deleteFuture, migrationFinished(query, delete, scheduleId, bucket, migrationLog,
remainingMetrics, migratedMetrics), threadPool);
}

private FutureCallback<ResultSet> migrationFinished(final PreparedStatement query, final PreparedStatement delete,
final Integer scheduleId, final Bucket bucket, final MigrationLog migrationLog, final AtomicInteger
remainingMetrics, final AtomicInteger migratedMetrics) {

return new FutureCallback<ResultSet>() {
@Override
public void onSuccess(List<ResultSet> resultSets) {
public void onSuccess(ResultSet deleteResultSet) {
try {
migrations.finishedTask();
remainingMetrics.decrementAndGet();
Expand All @@ -394,12 +426,11 @@ public void onSuccess(List<ResultSet> resultSets) {

@Override
public void onFailure(Throwable t) {
log.info("Failed to migrate " + bucket + " data for schedule id " + scheduleId);
migrations.finishedTask();
log.info("Failed to migrate " + bucket + " data for schedule id " + scheduleId +
". Migration will be rescheduled");
rateMonitor.requestFailed();
migrationsMeter.mark();
remainingMetrics.decrementAndGet();
failedMigrations.incrementAndGet();
migrateData(query, delete, bucket, remainingMetrics, migratedMetrics, migrationLog, scheduleId);
}
};
}
Expand All @@ -422,6 +453,7 @@ public void run() {
Bucket.ONE_HOUR + ": " + remaining1HourMetrics + "\n" +
Bucket.SIX_HOUR + ": " + remaining6HourMetrics + "\n" +
Bucket.TWENTY_FOUR_HOUR + ": " + remaining24HourMetrics + "\n");
log.info("Failed migrations: " + failedMigrations);
if (reportMigrationRates) {
log.info("Metrics migration rates:\n" +
"1 min rate: " + migrationsMeter.oneMinuteRate() + "\n" +
Expand Down

0 comments on commit f6b2fc9

Please sign in to comment.