Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
[BZ 185375] use async writes and dynamic throttling for index update
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Mar 1, 2015
1 parent f6b2fc9 commit 4d87b27
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 65 deletions.
Expand Up @@ -86,12 +86,22 @@ public int hashCode() {

private static final double FAILURE_THRESHOLD = 0.01;

private static final double RATE_INCREASE_STEP = 100;
private static final double DEFAULT_RATE_INCREASE_STEP = 100;

private static final double RATE_DECREASE_FACTOR = 0.9;

private static final int DEFAULT_RATE_INCREASE_CHECKPOINT = 60;

private static final int FIVE_SECOND_WINDOW_SIZE = 60;

private static final int STABLE_RATE_WINDOW = 90;

private LinkedList<RequestStats> oneSecondStats = new LinkedList<RequestStats>();

private LinkedList<AggregateRequestStats> fiveSecondStats = new LinkedList<AggregateRequestStats>();

private int stableRateTick;

private AtomicInteger requests = new AtomicInteger();

private AtomicInteger failRequests = new AtomicInteger();
Expand All @@ -104,6 +114,10 @@ public int hashCode() {

private AtomicReference<RateLimiter> writePermitsRef;

private double rateIncreaseStep = DEFAULT_RATE_INCREASE_STEP;

private int rateIncreaseCheckpoint = DEFAULT_RATE_INCREASE_CHECKPOINT;

public RateMonitor(AtomicReference<RateLimiter> readPermitsRef, AtomicReference<RateLimiter> writePermitsRef) {
this.readPermitsRef = readPermitsRef;
this.writePermitsRef = writePermitsRef;
Expand Down Expand Up @@ -134,36 +148,31 @@ public void run() {
aggregateStats();
if (isRateDecreaseNeeded()) {
decreaseRates();
oneSecondStats.clear();
fiveSecondStats.clear();
clearStats();
stableRateTick = 0;
rateIncreaseStep = DEFAULT_RATE_INCREASE_STEP;
rateIncreaseCheckpoint = DEFAULT_RATE_INCREASE_CHECKPOINT;
} else if (fiveSecondStats.peek().thresholdExceeded) {
increaseWarmup();
oneSecondStats.clear();
} else if (isRateIncreaseNeeded()) {
stableRateTick = 0;
rateIncreaseStep = DEFAULT_RATE_INCREASE_STEP;
rateIncreaseCheckpoint = DEFAULT_RATE_INCREASE_CHECKPOINT;
} else if (isLongTermRateStable()) {
rateIncreaseStep += 100;
rateIncreaseCheckpoint = Math.max(30, rateIncreaseCheckpoint - 15);
stableRateTick = 0;

log.info("Rates are stable. The rate increase step is now " + rateIncreaseStep +
" and the rate increase checkpoint is now " + rateIncreaseCheckpoint);

increaseRates();
oneSecondStats.clear();
fiveSecondStats.clear();
clearStats();
} else if (isShortTermRateStable()) {
increaseRates();
clearStats();
}
}

// if (isRateDecreaseNeeded()) {
// decreaseRates();
// fiveSecondStats.clear();
// } else if (fiveSecondStats.peek().thresholdExceeded) {
// increaseWarmup();
// } else if (isRateIncreaseNeeded()) {
// increaseRates();
// fiveSecondStats.clear();
// }

// if (recentStats.size() > 5) {
// recentStats.removeLast();
// }

// while (fiveSecondStats.size() > 60) {
// fiveSecondStats.removeLast();
// }

Thread.sleep(1000);
} catch (InterruptedException e) {
log.info("Stopping request monitoring due to interrupt", e);
Expand All @@ -173,19 +182,25 @@ public void run() {
}
}

protected void clearStats() {
oneSecondStats.clear();
fiveSecondStats.clear();
}

private void aggregateStats() {
double totalRequests = 0;
double totalFailures = 0;

stableRateTick++;

for (RequestStats stats : oneSecondStats) {
totalRequests += stats.requests;
totalFailures += stats.failedRequests;
}
// log.info(((totalFailures / totalRequests) * 100.0) + "% of requests failed in the past 5 seconds");
fiveSecondStats.addFirst(new AggregateRequestStats((totalFailures / totalRequests) > FAILURE_THRESHOLD,
totalFailures));
oneSecondStats.removeLast();
if (fiveSecondStats.size() > 60) {
if (fiveSecondStats.size() > FIVE_SECOND_WINDOW_SIZE) {
fiveSecondStats.removeLast();
}
}
Expand All @@ -211,10 +226,21 @@ private boolean isRateDecreaseNeeded() {
return failures > 2;
}

private boolean isRateIncreaseNeeded() {
if (fiveSecondStats.size() < 60) {
// We want to make sure we have at least a minute's worth of stats in order to
// decide if we should whether or not a rate increase is needed.
private boolean is5SecondStatsErrorFree() {
if (fiveSecondStats.size() < FIVE_SECOND_WINDOW_SIZE){
return false;
}

for (AggregateRequestStats stats : fiveSecondStats) {
if (stats.failedRequests > 0) {
return false;
}
}
return true;
}

private boolean isShortTermRateStable() {
if (fiveSecondStats.size() < rateIncreaseCheckpoint) {
return false;
}

Expand All @@ -223,19 +249,23 @@ private boolean isRateIncreaseNeeded() {
if (stats.failedRequests > 0) {
return false;
}
if (i > 59) {
if (i > rateIncreaseCheckpoint - 1) {
break;
}
++i;
}
return true;
}

private boolean isLongTermRateStable() {
return stableRateTick == STABLE_RATE_WINDOW;
}

private void decreaseRates() {
double readRate = readPermitsRef.get().getRate();
double newReadRate = readRate * 0.8;
double newReadRate = readRate * RATE_DECREASE_FACTOR;
double writeRate = writePermitsRef.get().getRate();
double newWriteRate = writeRate * 0.8;
double newWriteRate = writeRate * RATE_DECREASE_FACTOR;

log.info("Decreasing request rates:\n" +
readRate + " reads/sec --> " + newReadRate + " reads/sec\n" +
Expand All @@ -248,11 +278,9 @@ private void decreaseRates() {

private void increaseRates() {
double readRate = readPermitsRef.get().getRate();
// double newReadRate = readRate * 1.15;
double newReadRate = readRate + RATE_INCREASE_STEP;
double newReadRate = readRate + rateIncreaseStep;
double writeRate = writePermitsRef.get().getRate();
// double newWriteRate = writeRate * 1.15;
double newWriteRate = writeRate + RATE_INCREASE_STEP;
double newWriteRate = writeRate + rateIncreaseStep;

log.info("Increasing request rates:\n" +
readRate + " reads/sec --> " + newReadRate + " reads/sec\n" +
Expand Down
@@ -1,10 +1,21 @@
package org.rhq.cassandra.schema;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -32,17 +43,47 @@ public class ReplaceRHQ411Index {

private PreparedStatement updateNewIndex;

private ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4,
new SchemaUpdateThreadFactory()));

private AtomicReference<RateLimiter> writePermitsRef = new AtomicReference<RateLimiter>();

private RateMonitor rateMonitor;

public ReplaceRHQ411Index(Session session) {
this.session = session;
}

public void execute(DateRanges dateRanges) {
initPreparedStatements();
log.info("Updating indexes");
Stopwatch stopwatch = Stopwatch.createStarted();
try {
initPreparedStatements();

writePermitsRef.set(RateLimiter.create(Integer.parseInt(System.getProperty(
"rhq.storage.request.write-limit", "20000"))));
// We only care about throttling writes, but RateMonitor expects a RateLimiter
// for both writes and reads.
AtomicReference<RateLimiter> readPermitsRef = new AtomicReference<RateLimiter>(
RateLimiter.create(100));

rateMonitor = new RateMonitor(readPermitsRef, writePermitsRef);
threadPool.submit(rateMonitor);

updateRawIndex(dateRanges.rawStartTime, dateRanges.rawEndTime);
update1HourIndex(dateRanges.oneHourStartTime, dateRanges.oneHourEndTime);
update6HourIndex(dateRanges.sixHourStartTime, dateRanges.sixHourEndTime);
drop411Index();
} finally {
stopwatch.stop();
shutdown();
log.info("Finished updating indexes in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
}

updateRawIndex(dateRanges.rawStartTime, dateRanges.rawEndTime);
update1HourIndex(dateRanges.oneHourStartTime, dateRanges.oneHourEndTime);
update6HourIndex(dateRanges.sixHourStartTime, dateRanges.sixHourEndTime);
drop411Index();
private void shutdown() {
rateMonitor.shutdown();
threadPool.shutdown();
}

private void initPreparedStatements() {
Expand Down Expand Up @@ -73,30 +114,66 @@ private void update6HourIndex(DateTime start, DateTime end) {
}

private void updateIndex(String oldBucket, String newBucket, DateTime start, DateTime end, Duration timeSlice) {
DateTime time = start;
BoundStatement statement = find411IndexEntries.bind(oldBucket, start.toDate());
ResultSet resultSet = session.execute(statement);
int count = 0;
int scheduleId = 0;
int partition = 0;

do {
for (Row row : resultSet) {
scheduleId = row.getInt(0);
partition = (scheduleId % NUM_PARTITIONS);
++count;
session.execute(updateNewIndex.bind(newBucket, partition, DateUtils.getUTCTimeSlice(time, timeSlice)
.toDate(), scheduleId));
try {
DateTime time = start;
BoundStatement statement = find411IndexEntries.bind(oldBucket, start.toDate());
ResultSet resultSet = session.execute(statement);
int count = 0;
int scheduleId = 0;
final TaskTracker taskTracker = new TaskTracker();

do {
for (Row row : resultSet) {
scheduleId = row.getInt(0);
++count;
addScheduleIdToIndex(newBucket, timeSlice, time, scheduleId, taskTracker);
}
if (count < PAGE_SIZE) {
time = DateUtils.plusDSTAware(time, timeSlice);
statement = find411IndexEntries.bind(oldBucket, time.toDate());
} else {
statement = find411IndexEntriesAfterScheduleId.bind(oldBucket, time.toDate(), scheduleId);
}
count = 0;
resultSet = session.execute(statement);
} while (!time.isAfter(end));

taskTracker.finishedSchedulingTasks();
taskTracker.waitForTasksToFinish();
} catch (InterruptedException e) {
throw new RuntimeException("The index update did not complete due to an interrupt", e);
} catch (AbortedException e) {
throw new RuntimeException("The index update was aborted", e);
}
}

private void addScheduleIdToIndex(String newBucket, Duration timeSlice, DateTime time, int scheduleId,
final TaskTracker taskTracker) {

taskTracker.addTask();
int partition = (scheduleId % NUM_PARTITIONS);
writePermitsRef.get().acquire();
ResultSetFuture insertFuture = session.executeAsync(updateNewIndex.bind(newBucket, partition,
DateUtils.getUTCTimeSlice(time, timeSlice).toDate(), scheduleId));
Futures.addCallback(insertFuture, indexUpdated(newBucket, timeSlice, time, scheduleId, taskTracker),
threadPool);
}

private FutureCallback<ResultSet> indexUpdated(final String newBucket, final Duration timeSlice,
final DateTime time, final int scheduleId, final TaskTracker taskTracker) {

return new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet resultSet) {
taskTracker.finishedTask();
}
if (count < PAGE_SIZE) {
time = DateUtils.plusDSTAware(time, timeSlice);
statement = find411IndexEntries.bind(oldBucket, time.toDate());
} else {
statement = find411IndexEntriesAfterScheduleId.bind(oldBucket, time.toDate(), scheduleId);

@Override
public void onFailure(Throwable t) {
rateMonitor.requestFailed();
addScheduleIdToIndex(newBucket, timeSlice, time, scheduleId, taskTracker);
}
count = 0;
resultSet = session.execute(statement);
} while (!time.isAfter(end));
};
}

private void drop411Index() {
Expand Down

0 comments on commit 4d87b27

Please sign in to comment.