diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java index a5af97970eb..2d79a69acd9 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/MigrateAggregateMetrics.java @@ -112,7 +112,10 @@ public String toString() { private MetricsRegistry metricsRegistry; - @Override + private int totalMetrics; + + private double failureThreshold = Double.parseDouble(System.getProperty("rhq.storage.failure-threshold", "0.05")); + public void setSession(Session session) { this.session = session; } @@ -179,6 +182,7 @@ public void execute() { } finally { stopwatch.stop(); log.info("Finished data migration in " + stopwatch.elapsed(TimeUnit.SECONDS) + " sec"); + shutdown(); } } @@ -187,7 +191,7 @@ private int getWriteLimit() { } private int getReadLimit() { - return Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "100")); + return Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "110")); } private int getNumberOfUpNodes() { @@ -200,6 +204,17 @@ private int getNumberOfUpNodes() { return count; } + private void shutdown() { + try { + log.info("Shutting down migration thread pools..."); + scheduler.shutdown(); + threadPool.shutdown(); + scheduler.awaitTermination(5, TimeUnit.SECONDS); + threadPool.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } + } + private Keyspace createKeyspace() { CassandraHostConfigurator configurator = new CassandraHostConfigurator("127.0.01"); Map credentials = ImmutableMap.of( @@ -264,7 +279,6 @@ private void scheduleMigrations(Iterator keyIterator, final PreparedSta log.debug("Scheduling " + bucket + " data migration for schedule id " + scheduleId); } migrations.addTask(); -// migrationsQeue.offer(new MigrationArgs(scheduleId, bucket, query, migratedScheduleIds, migrationLog)); scheduler.submit(new Runnable() { @Override public void run() { @@ -283,7 +297,7 @@ public void run() { } }); } - + totalMetrics += count; stopwatch.stop(); log.info("Finished scanning for schedule ids with " + bucket + " data in " + stopwatch.elapsed(TimeUnit.SECONDS) + " sec"); @@ -347,6 +361,12 @@ public void run() { } else { reportMigrationRates = true; } + + if ((failedMigrations.get() / totalMetrics) > failureThreshold) { + migrations.abort("The failure threshold has been exceeded with " + failedMigrations.get() + + " failed migrations"); + } + Thread.sleep(30000); } } catch (InterruptedException e) {