Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
[BZ 185375] remove key scanning code with hector
Browse files Browse the repository at this point in the history
I went back and did some testing with the original approach of querying against
all schedule ids that are returned from the RDBMS. I was able to achieve the
same performance (if not better) as I did with hector. I do not see any reason
therefore to pull in additional dependencies.

Conflicts:
	modules/common/cassandra-schema/pom.xml
  • Loading branch information
John Sanda committed Mar 1, 2015
1 parent d4d8743 commit 966e923
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 140 deletions.
27 changes: 20 additions & 7 deletions modules/common/cassandra-schema/pom.xml
Expand Up @@ -41,13 +41,6 @@
<version>${cassandra.driver.version}</version>
</dependency>

<dependency>
<groupId>me.prettyprint</groupId>
<artifactId>hector-core</artifactId>
<version>1.0-5</version>
<!--<scope>test</scope>-->
</dependency>

<dependency>
<groupId>org.jboss</groupId>
<artifactId>jboss-vfs</artifactId>
Expand Down Expand Up @@ -128,6 +121,26 @@
</systemProperties>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.rhq.cassandra.schema.MigrateMetrics</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Expand Up @@ -2,12 +2,15 @@

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -18,7 +21,7 @@
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.LineReader;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -33,12 +36,6 @@
import org.joda.time.DateTime;
import org.joda.time.Days;

import me.prettyprint.cassandra.serializers.IntegerSerializer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.KeyIterator;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.factory.HFactory;

/**
* <p>
* Migrates aggregate metrics from the one_hour_metrics, six_hour_metrics, and twenty_four_hour_metrics tables to the
Expand Down Expand Up @@ -99,23 +96,28 @@ public String toString() {

private AtomicInteger remaining24HourMetrics = new AtomicInteger();

private AtomicInteger migrated1HourMetrics = new AtomicInteger();

private AtomicInteger migrated6HourMetrics = new AtomicInteger();

private AtomicInteger migrated24HourMetrics = new AtomicInteger();

private AtomicInteger failedMigrations = new AtomicInteger();

private ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5,
new SchemaUpdateThreadFactory()));

private ExecutorService scheduler = Executors.newFixedThreadPool(1);

private TaskTracker migrations = new TaskTracker();
private MetricsRegistry metricsRegistry;

private Meter migrationsMeter;

private MetricsRegistry metricsRegistry;
private TaskTracker migrations = new TaskTracker();

private int totalMetrics;

private double failureThreshold = Double.parseDouble(System.getProperty("rhq.storage.failure-threshold", "0.05"));

@Override
public void setSession(Session session) {
this.session = session;
}
Expand All @@ -126,15 +128,9 @@ public void bind(Properties properties) {
dataDir = properties.getProperty("data.dir", System.getProperty("jboss.server.data.dir"));
}

@Override
public String toString() {
return getClass().getSimpleName();
}

@Override
public void execute() {
log.info("Starting data migration");

metricsRegistry = new MetricsRegistry();
migrationsMeter = metricsRegistry.newMeter(MigrateAggregateMetrics.class, "migrations", "migrations",
TimeUnit.MINUTES);
Expand All @@ -148,25 +144,28 @@ public void execute() {
return;
}
writePermits = RateLimiter.create(getWriteLimit() * getNumberOfUpNodes(), 10, TimeUnit.SECONDS);
readPermits = RateLimiter.create(getReadLimit() * getNumberOfUpNodes(), 90, TimeUnit.SECONDS);
Keyspace keyspace = createKeyspace();
readPermits = RateLimiter.create(getReadLimit() * getNumberOfUpNodes(), 10, TimeUnit.SECONDS);

log.info("The request limits are " + writePermits.getRate() + " writes per econd and " +
readPermits.getRate() + " reads per second");

Set<Integer> scheduleIds = loadScheduleIds();
remaining1HourMetrics.set(scheduleIds.size());
remaining6HourMetrics.set(scheduleIds.size());
remaining24HourMetrics.set(scheduleIds.size());
totalMetrics = scheduleIds.size() * 3;
MigrationProgressLogger progressLogger = new MigrationProgressLogger();

threadPool.submit(progressLogger);

schedule1HourDataMigrations(keyspace);
schedule6HourDataMigrations(keyspace);
schedule24HourDataMigrations(keyspace);
migrate1HourData(scheduleIds);
migrate6HourData(scheduleIds);
migrate24HourData(scheduleIds);

migrations.finishedSchedulingTasks();
migrations.waitForTasksToFinish();
progressLogger.finished();

if (failedMigrations.get() > 0) {
throw new RuntimeException("There were " + failedMigrations + " failed migration tasks. The upgrade " +
"needs to be run again in order to complete the migration.");
}
dropTables();
} catch (IOException e) {
throw new RuntimeException("There was an unexpected I/O error. The are still " +
migrations.getRemainingTasks() + " outstanding migration tasks. The upgrade must be run again to " +
Expand All @@ -182,6 +181,7 @@ public void execute() {
} finally {
stopwatch.stop();
log.info("Finished data migration in " + stopwatch.elapsed(TimeUnit.SECONDS) + " sec");
log.info("There were " + failedMigrations + " failed migrations");
shutdown();
}
}
Expand All @@ -191,7 +191,7 @@ private int getWriteLimit() {
}

private int getReadLimit() {
return Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "110"));
return Integer.parseInt(System.getProperty("rhq.storage.request.read-limit", "200"));
}

private int getNumberOfUpNodes() {
Expand All @@ -207,111 +207,93 @@ private int getNumberOfUpNodes() {
private void shutdown() {
try {
log.info("Shutting down migration thread pools...");
scheduler.shutdown();
threadPool.shutdown();
scheduler.awaitTermination(5, TimeUnit.SECONDS);
threadPool.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}

private Keyspace createKeyspace() {
CassandraHostConfigurator configurator = new CassandraHostConfigurator("127.0.01");
Map<String, String> credentials = ImmutableMap.of(
"username", "rhqadmin",
"password", "rhqadmin"
);
me.prettyprint.hector.api.Cluster cluster = HFactory.createCluster("rhq", configurator, credentials);
return HFactory.createKeyspace("rhq", cluster);
}
private Set<Integer> loadScheduleIds() {
NumberFormat formatter = new DecimalFormat("\"#,#\"");
Set<Integer> scheduleIds = new HashSet<Integer>();
InputStream stream = getClass().getResourceAsStream("/schedule_ids");
LineReader reader = new LineReader(new InputStreamReader(stream));

private Iterator<Integer> createKeyIterator(Keyspace keyspace, String table) {
KeyIterator<Integer> keyIterator = new KeyIterator<Integer>(keyspace, table, IntegerSerializer.get());
return keyIterator.iterator();
try {
String line = reader.readLine();
while (line != null) {
scheduleIds.add(formatter.parse(line).intValue());
line = reader.readLine();
}
return scheduleIds;
} catch (IOException e) {
throw new RuntimeException("Failed to load schedule ids");
} catch (ParseException e) {
throw new RuntimeException("Failed to load schedule ids");
}
}

private void schedule1HourDataMigrations(Keyspace keyspace) throws IOException {
Iterator<Integer> keyIterator = createKeyIterator(keyspace, "one_hour_metrics");
private void migrate1HourData(Set<Integer> scheduleIds) throws IOException {
DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now());
DateTime startTime = endTime.minus(Days.days(14));
PreparedStatement query = session.prepare(
"SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.one_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());

scheduleMigrations(keyIterator, query, Bucket.ONE_HOUR, remaining1HourMetrics);
migrateData(scheduleIds, query, Bucket.ONE_HOUR, remaining1HourMetrics, migrated1HourMetrics);
}

private void schedule6HourDataMigrations(Keyspace keyspace) throws IOException {
Iterator<Integer> keyIterator = createKeyIterator(keyspace, "six_hour_metrics");
private void migrate6HourData(Set<Integer> scheduleIds) throws IOException {
DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now());
DateTime startTime = endTime.minus(Days.days(31));
DateTime startTime = endTime.minus(Days.days(14));
PreparedStatement query = session.prepare(
"SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.six_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());

scheduleMigrations(keyIterator, query, Bucket.SIX_HOUR, remaining6HourMetrics);
migrateData(scheduleIds, query, Bucket.SIX_HOUR, remaining6HourMetrics, migrated6HourMetrics);
}

private void schedule24HourDataMigrations(Keyspace keyspace) throws IOException {
Iterator<Integer> keyIterator = createKeyIterator(keyspace, "twenty_four_hour_metrics");
private void migrate24HourData(Set<Integer> scheduleIds) throws IOException {
DateTime endTime = DateUtils.get1HourTimeSlice(DateTime.now());
DateTime startTime = endTime.minus(Days.days(365));
DateTime startTime = endTime.minus(Days.days(14));
PreparedStatement query = session.prepare(
"SELECT schedule_id, time, type, value, ttl(value), writetime(value) FROM rhq.twenty_four_hour_metrics " +
"WHERE schedule_id = ? AND time >= " + startTime.getMillis() + " AND time <= " + endTime.getMillis());

scheduleMigrations(keyIterator, query, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics);
migrateData(scheduleIds, query, Bucket.TWENTY_FOUR_HOUR, remaining24HourMetrics, migrated24HourMetrics);
}

private void scheduleMigrations(Iterator<Integer> keyIterator, final PreparedStatement query, final Bucket bucket,
final AtomicInteger remainingMetrics) throws IOException {
private void migrateData(final Set<Integer> scheduleIds, final PreparedStatement query, final Bucket bucket,
final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics) throws IOException {

Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Scanning for schedule ids with " + bucket + " data");
log.info("Scheduling data migration tasks for " + bucket + " data");
final MigrationLog migrationLog = new MigrationLog(new File(dataDir, bucket + "_migration.log"));
final Set<Integer> migratedScheduleIds = migrationLog.read();
int count = 0;

while (keyIterator.hasNext()) {
final Integer scheduleId = keyIterator.next();
count++;
remainingMetrics.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("Scheduling " + bucket + " data migration for schedule id " + scheduleId);
for (final Integer scheduleId : scheduleIds) {
if (!migratedScheduleIds.contains(scheduleId)) {
migrations.addTask();
readPermits.acquire();
ResultSetFuture queryFuture = session.executeAsync(query.bind(scheduleId));
ListenableFuture<List<ResultSet>> migrationFuture = Futures.transform(queryFuture,
new MigrateData(scheduleId, bucket, writePermits, session), threadPool);
Futures.addCallback(migrationFuture, migrationFinished(scheduleId, bucket, migrationLog,
remainingMetrics, migratedMetrics), threadPool);
}
migrations.addTask();
scheduler.submit(new Runnable() {
@Override
public void run() {
try {
if (!migratedScheduleIds.contains(scheduleId)) {
readPermits.acquire();
ResultSetFuture queryFuture = session.executeAsync(query.bind(scheduleId));
ListenableFuture<List<ResultSet>> migrationFuture = Futures.transform(queryFuture,
new MigrateData(scheduleId, bucket, writePermits, session), threadPool);
Futures.addCallback(migrationFuture, migrationFinished(scheduleId, bucket, migrationLog,
remainingMetrics), threadPool);
}
} catch (Exception e) {
log.warn("There was an error migrating data", e);
}
}
});
}
totalMetrics += count;

stopwatch.stop();
log.info("Finished scanning for schedule ids with " + bucket + " data in " +
log.info("Finished scheduling migration tasks for " + bucket + " data in " +
stopwatch.elapsed(TimeUnit.SECONDS) + " sec");
log.info("There are a total of " + count + " schedule ids with " + bucket + " data to migrate");
}

private FutureCallback<List<ResultSet>> migrationFinished(final Integer scheduleId, final Bucket bucket,
final MigrationLog migrationLog, final AtomicInteger remainingMetrics) {
final MigrationLog migrationLog, final AtomicInteger remainingMetrics, final AtomicInteger migratedMetrics) {
return new FutureCallback<List<ResultSet>>() {
@Override
public void onSuccess(List<ResultSet> resultSets) {
try {
migrations.finishedTask();
remainingMetrics.decrementAndGet();
migratedMetrics.incrementAndGet();
migrationLog.write(scheduleId);
migrationsMeter.mark();
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -362,7 +344,7 @@ public void run() {
reportMigrationRates = true;
}

if ((failedMigrations.get() / totalMetrics) > failureThreshold) {
if (totalMetrics > 0 && (failedMigrations.get() / totalMetrics) > failureThreshold) {
migrations.abort("The failure threshold has been exceeded with " + failedMigrations.get() +
" failed migrations");
}
Expand Down

0 comments on commit 966e923

Please sign in to comment.