Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
[BZ 185375] add failure detection
Browse files Browse the repository at this point in the history
There is a now a failure threshold that if exceeded will cause the migration to
be aborted. The rationale is that the read/write rates will likely have to be
adjusted based on each environment. If we are reading too fast for example,
and start generating lots of failiures, it makes sense to go ahead and
terminate the migration and restart with a lower read rate.
  • Loading branch information
John Sanda committed Mar 1, 2015
1 parent 4d07cd6 commit d4d8743
Showing 1 changed file with 24 additions and 4 deletions.
Expand Up @@ -112,7 +112,10 @@ public String toString() {

private MetricsRegistry metricsRegistry;

@Override
private int totalMetrics;

private double failureThreshold = Double.parseDouble(System.getProperty("rhq.storage.failure-threshold", "0.05"));

public void setSession(Session session) {
this.session = session;
}
Expand Down Expand Up @@ -179,6 +182,7 @@ public void execute() {
} finally {
stopwatch.stop();
log.info("Finished data migration in " + stopwatch.elapsed(TimeUnit.SECONDS) + " sec");
shutdown();
}
}

Expand All @@ -187,7 +191,7 @@ private int getWriteLimit() {
}

private int getReadLimit() {
return Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "100"));
return Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "110"));
}

private int getNumberOfUpNodes() {
Expand All @@ -200,6 +204,17 @@ private int getNumberOfUpNodes() {
return count;
}

private void shutdown() {
try {
log.info("Shutting down migration thread pools...");
scheduler.shutdown();
threadPool.shutdown();
scheduler.awaitTermination(5, TimeUnit.SECONDS);
threadPool.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}

private Keyspace createKeyspace() {
CassandraHostConfigurator configurator = new CassandraHostConfigurator("127.0.01");
Map<String, String> credentials = ImmutableMap.of(
Expand Down Expand Up @@ -264,7 +279,6 @@ private void scheduleMigrations(Iterator<Integer> keyIterator, final PreparedSta
log.debug("Scheduling " + bucket + " data migration for schedule id " + scheduleId);
}
migrations.addTask();
// migrationsQeue.offer(new MigrationArgs(scheduleId, bucket, query, migratedScheduleIds, migrationLog));
scheduler.submit(new Runnable() {
@Override
public void run() {
Expand All @@ -283,7 +297,7 @@ public void run() {
}
});
}

totalMetrics += count;
stopwatch.stop();
log.info("Finished scanning for schedule ids with " + bucket + " data in " +
stopwatch.elapsed(TimeUnit.SECONDS) + " sec");
Expand Down Expand Up @@ -347,6 +361,12 @@ public void run() {
} else {
reportMigrationRates = true;
}

if ((failedMigrations.get() / totalMetrics) > failureThreshold) {
migrations.abort("The failure threshold has been exceeded with " + failedMigrations.get() +
" failed migrations");
}

Thread.sleep(30000);
}
} catch (InterruptedException e) {
Expand Down

0 comments on commit d4d8743

Please sign in to comment.