From 4d87b270289ce8c402626fc7281c2415f71ee11b Mon Sep 17 00:00:00 2001 From: John Sanda Date: Fri, 27 Feb 2015 16:53:31 -0500 Subject: [PATCH] [BZ 185375] use async writes and dynamic throttling for index update --- .../org/rhq/cassandra/schema/RateMonitor.java | 104 +++++++++----- .../cassandra/schema/ReplaceRHQ411Index.java | 131 ++++++++++++++---- 2 files changed, 170 insertions(+), 65 deletions(-) diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/RateMonitor.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/RateMonitor.java index 6d4fd5888a5..e2d0deaec12 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/RateMonitor.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/RateMonitor.java @@ -86,12 +86,22 @@ public int hashCode() { private static final double FAILURE_THRESHOLD = 0.01; - private static final double RATE_INCREASE_STEP = 100; + private static final double DEFAULT_RATE_INCREASE_STEP = 100; + + private static final double RATE_DECREASE_FACTOR = 0.9; + + private static final int DEFAULT_RATE_INCREASE_CHECKPOINT = 60; + + private static final int FIVE_SECOND_WINDOW_SIZE = 60; + + private static final int STABLE_RATE_WINDOW = 90; private LinkedList oneSecondStats = new LinkedList(); private LinkedList fiveSecondStats = new LinkedList(); + private int stableRateTick; + private AtomicInteger requests = new AtomicInteger(); private AtomicInteger failRequests = new AtomicInteger(); @@ -104,6 +114,10 @@ public int hashCode() { private AtomicReference writePermitsRef; + private double rateIncreaseStep = DEFAULT_RATE_INCREASE_STEP; + + private int rateIncreaseCheckpoint = DEFAULT_RATE_INCREASE_CHECKPOINT; + public RateMonitor(AtomicReference readPermitsRef, AtomicReference writePermitsRef) { this.readPermitsRef = readPermitsRef; this.writePermitsRef = writePermitsRef; @@ -134,36 +148,31 @@ public void run() { aggregateStats(); if (isRateDecreaseNeeded()) { decreaseRates(); - oneSecondStats.clear(); - fiveSecondStats.clear(); + clearStats(); + stableRateTick = 0; + rateIncreaseStep = DEFAULT_RATE_INCREASE_STEP; + rateIncreaseCheckpoint = DEFAULT_RATE_INCREASE_CHECKPOINT; } else if (fiveSecondStats.peek().thresholdExceeded) { increaseWarmup(); oneSecondStats.clear(); - } else if (isRateIncreaseNeeded()) { + stableRateTick = 0; + rateIncreaseStep = DEFAULT_RATE_INCREASE_STEP; + rateIncreaseCheckpoint = DEFAULT_RATE_INCREASE_CHECKPOINT; + } else if (isLongTermRateStable()) { + rateIncreaseStep += 100; + rateIncreaseCheckpoint = Math.max(30, rateIncreaseCheckpoint - 15); + stableRateTick = 0; + + log.info("Rates are stable. The rate increase step is now " + rateIncreaseStep + + " and the rate increase checkpoint is now " + rateIncreaseCheckpoint); + increaseRates(); - oneSecondStats.clear(); - fiveSecondStats.clear(); + clearStats(); + } else if (isShortTermRateStable()) { + increaseRates(); + clearStats(); } } - -// if (isRateDecreaseNeeded()) { -// decreaseRates(); -// fiveSecondStats.clear(); -// } else if (fiveSecondStats.peek().thresholdExceeded) { -// increaseWarmup(); -// } else if (isRateIncreaseNeeded()) { -// increaseRates(); -// fiveSecondStats.clear(); -// } - -// if (recentStats.size() > 5) { -// recentStats.removeLast(); -// } - -// while (fiveSecondStats.size() > 60) { -// fiveSecondStats.removeLast(); -// } - Thread.sleep(1000); } catch (InterruptedException e) { log.info("Stopping request monitoring due to interrupt", e); @@ -173,19 +182,25 @@ public void run() { } } + protected void clearStats() { + oneSecondStats.clear(); + fiveSecondStats.clear(); + } + private void aggregateStats() { double totalRequests = 0; double totalFailures = 0; + stableRateTick++; + for (RequestStats stats : oneSecondStats) { totalRequests += stats.requests; totalFailures += stats.failedRequests; } -// log.info(((totalFailures / totalRequests) * 100.0) + "% of requests failed in the past 5 seconds"); fiveSecondStats.addFirst(new AggregateRequestStats((totalFailures / totalRequests) > FAILURE_THRESHOLD, totalFailures)); oneSecondStats.removeLast(); - if (fiveSecondStats.size() > 60) { + if (fiveSecondStats.size() > FIVE_SECOND_WINDOW_SIZE) { fiveSecondStats.removeLast(); } } @@ -211,10 +226,21 @@ private boolean isRateDecreaseNeeded() { return failures > 2; } - private boolean isRateIncreaseNeeded() { - if (fiveSecondStats.size() < 60) { - // We want to make sure we have at least a minute's worth of stats in order to - // decide if we should whether or not a rate increase is needed. + private boolean is5SecondStatsErrorFree() { + if (fiveSecondStats.size() < FIVE_SECOND_WINDOW_SIZE){ + return false; + } + + for (AggregateRequestStats stats : fiveSecondStats) { + if (stats.failedRequests > 0) { + return false; + } + } + return true; + } + + private boolean isShortTermRateStable() { + if (fiveSecondStats.size() < rateIncreaseCheckpoint) { return false; } @@ -223,7 +249,7 @@ private boolean isRateIncreaseNeeded() { if (stats.failedRequests > 0) { return false; } - if (i > 59) { + if (i > rateIncreaseCheckpoint - 1) { break; } ++i; @@ -231,11 +257,15 @@ private boolean isRateIncreaseNeeded() { return true; } + private boolean isLongTermRateStable() { + return stableRateTick == STABLE_RATE_WINDOW; + } + private void decreaseRates() { double readRate = readPermitsRef.get().getRate(); - double newReadRate = readRate * 0.8; + double newReadRate = readRate * RATE_DECREASE_FACTOR; double writeRate = writePermitsRef.get().getRate(); - double newWriteRate = writeRate * 0.8; + double newWriteRate = writeRate * RATE_DECREASE_FACTOR; log.info("Decreasing request rates:\n" + readRate + " reads/sec --> " + newReadRate + " reads/sec\n" + @@ -248,11 +278,9 @@ private void decreaseRates() { private void increaseRates() { double readRate = readPermitsRef.get().getRate(); -// double newReadRate = readRate * 1.15; - double newReadRate = readRate + RATE_INCREASE_STEP; + double newReadRate = readRate + rateIncreaseStep; double writeRate = writePermitsRef.get().getRate(); -// double newWriteRate = writeRate * 1.15; - double newWriteRate = writeRate + RATE_INCREASE_STEP; + double newWriteRate = writeRate + rateIncreaseStep; log.info("Increasing request rates:\n" + readRate + " reads/sec --> " + newReadRate + " reads/sec\n" + diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/ReplaceRHQ411Index.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/ReplaceRHQ411Index.java index 04d560cf88f..466920271c9 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/ReplaceRHQ411Index.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/ReplaceRHQ411Index.java @@ -1,10 +1,21 @@ package org.rhq.cassandra.schema; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,17 +43,47 @@ public class ReplaceRHQ411Index { private PreparedStatement updateNewIndex; + private ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4, + new SchemaUpdateThreadFactory())); + + private AtomicReference writePermitsRef = new AtomicReference(); + + private RateMonitor rateMonitor; + public ReplaceRHQ411Index(Session session) { this.session = session; } public void execute(DateRanges dateRanges) { - initPreparedStatements(); + log.info("Updating indexes"); + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + initPreparedStatements(); + + writePermitsRef.set(RateLimiter.create(Integer.parseInt(System.getProperty( + "rhq.storage.request.write-limit", "20000")))); + // We only care about throttling writes, but RateMonitor expects a RateLimiter + // for both writes and reads. + AtomicReference readPermitsRef = new AtomicReference( + RateLimiter.create(100)); + + rateMonitor = new RateMonitor(readPermitsRef, writePermitsRef); + threadPool.submit(rateMonitor); + + updateRawIndex(dateRanges.rawStartTime, dateRanges.rawEndTime); + update1HourIndex(dateRanges.oneHourStartTime, dateRanges.oneHourEndTime); + update6HourIndex(dateRanges.sixHourStartTime, dateRanges.sixHourEndTime); + drop411Index(); + } finally { + stopwatch.stop(); + shutdown(); + log.info("Finished updating indexes in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"); + } + } - updateRawIndex(dateRanges.rawStartTime, dateRanges.rawEndTime); - update1HourIndex(dateRanges.oneHourStartTime, dateRanges.oneHourEndTime); - update6HourIndex(dateRanges.sixHourStartTime, dateRanges.sixHourEndTime); - drop411Index(); + private void shutdown() { + rateMonitor.shutdown(); + threadPool.shutdown(); } private void initPreparedStatements() { @@ -73,30 +114,66 @@ private void update6HourIndex(DateTime start, DateTime end) { } private void updateIndex(String oldBucket, String newBucket, DateTime start, DateTime end, Duration timeSlice) { - DateTime time = start; - BoundStatement statement = find411IndexEntries.bind(oldBucket, start.toDate()); - ResultSet resultSet = session.execute(statement); - int count = 0; - int scheduleId = 0; - int partition = 0; - - do { - for (Row row : resultSet) { - scheduleId = row.getInt(0); - partition = (scheduleId % NUM_PARTITIONS); - ++count; - session.execute(updateNewIndex.bind(newBucket, partition, DateUtils.getUTCTimeSlice(time, timeSlice) - .toDate(), scheduleId)); + try { + DateTime time = start; + BoundStatement statement = find411IndexEntries.bind(oldBucket, start.toDate()); + ResultSet resultSet = session.execute(statement); + int count = 0; + int scheduleId = 0; + final TaskTracker taskTracker = new TaskTracker(); + + do { + for (Row row : resultSet) { + scheduleId = row.getInt(0); + ++count; + addScheduleIdToIndex(newBucket, timeSlice, time, scheduleId, taskTracker); + } + if (count < PAGE_SIZE) { + time = DateUtils.plusDSTAware(time, timeSlice); + statement = find411IndexEntries.bind(oldBucket, time.toDate()); + } else { + statement = find411IndexEntriesAfterScheduleId.bind(oldBucket, time.toDate(), scheduleId); + } + count = 0; + resultSet = session.execute(statement); + } while (!time.isAfter(end)); + + taskTracker.finishedSchedulingTasks(); + taskTracker.waitForTasksToFinish(); + } catch (InterruptedException e) { + throw new RuntimeException("The index update did not complete due to an interrupt", e); + } catch (AbortedException e) { + throw new RuntimeException("The index update was aborted", e); + } + } + + private void addScheduleIdToIndex(String newBucket, Duration timeSlice, DateTime time, int scheduleId, + final TaskTracker taskTracker) { + + taskTracker.addTask(); + int partition = (scheduleId % NUM_PARTITIONS); + writePermitsRef.get().acquire(); + ResultSetFuture insertFuture = session.executeAsync(updateNewIndex.bind(newBucket, partition, + DateUtils.getUTCTimeSlice(time, timeSlice).toDate(), scheduleId)); + Futures.addCallback(insertFuture, indexUpdated(newBucket, timeSlice, time, scheduleId, taskTracker), + threadPool); + } + + private FutureCallback indexUpdated(final String newBucket, final Duration timeSlice, + final DateTime time, final int scheduleId, final TaskTracker taskTracker) { + + return new FutureCallback() { + @Override + public void onSuccess(ResultSet resultSet) { + taskTracker.finishedTask(); } - if (count < PAGE_SIZE) { - time = DateUtils.plusDSTAware(time, timeSlice); - statement = find411IndexEntries.bind(oldBucket, time.toDate()); - } else { - statement = find411IndexEntriesAfterScheduleId.bind(oldBucket, time.toDate(), scheduleId); + + @Override + public void onFailure(Throwable t) { + rateMonitor.requestFailed(); + addScheduleIdToIndex(newBucket, timeSlice, time, scheduleId, taskTracker); } - count = 0; - resultSet = session.execute(statement); - } while (!time.isAfter(end)); + }; } private void drop411Index() {