Skip to content
Permalink
Browse files

Merge pull request #66 from killbill/queue-perf-enhancements

queue: Decouple the reaper threshold with the reaper scheduling period
  • Loading branch information...
sbrossie committed May 14, 2019
2 parents 8814836 + cce0ceb commit 084f527b40962f6dec929ae8a186d95adc5395c4
@@ -27,19 +27,26 @@
import org.killbill.commons.concurrent.Executors;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.Reaper;
import org.skife.config.TimeSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DefaultReaper implements Reaper {

static final long ONE_MINUTES_IN_MSEC = 60000;
static final long THREE_MINUTES_IN_MSEC = 3 * ONE_MINUTES_IN_MSEC;
static final long FIVE_MINUTES_IN_MSEC = 5 * ONE_MINUTES_IN_MSEC;

static final String REAPER_SCHEDULE_PROP = "org.killbill.queue.reap.schedule";


private final DBBackedQueue<?> dao;
private final PersistentQueueConfig config;
private final Clock clock;
private final AtomicBoolean isStarted;
private final String threadScheduledExecutorName;
private ScheduledFuture<?> reapEntriesHandle;

private final long FIVE_MINUTES = 300000;
private ScheduledFuture<?> reapEntriesHandle;

private static final Logger log = LoggerFactory.getLogger(DefaultReaper.class);

@@ -59,22 +66,26 @@ public void start() {
return;
}

log.info("{}: Starting...", threadScheduledExecutorName);

final long pendingPeriod = getReapThreshold();
final long reapThresholdMillis = getReapThreshold();
final long schedulePeriodMillis = getSchedulePeriod();

log.info("{}: Starting... reapThresholdMillis={}, schedulePeriodMillis={}",
threadScheduledExecutorName, reapThresholdMillis, schedulePeriodMillis);

final Runnable reapEntries = new Runnable() {
@Override
public void run() {
dao.reapEntries(getReapingDate());
}

private Date getReapingDate() {
return clock.getUTCNow().minusMillis((int) pendingPeriod).toDate();
return clock.getUTCNow().minusMillis((int) reapThresholdMillis).toDate();
}
};

scheduler = Executors.newSingleThreadScheduledExecutor(threadScheduledExecutorName);
reapEntriesHandle = scheduler.scheduleWithFixedDelay(reapEntries, pendingPeriod, pendingPeriod, TimeUnit.MILLISECONDS);
reapEntriesHandle = scheduler.scheduleWithFixedDelay(reapEntries, schedulePeriodMillis, schedulePeriodMillis, TimeUnit.MILLISECONDS);
}

@Override
@@ -104,12 +115,12 @@ public boolean isStarted() {
return isStarted.get();
}

private long getReapThreshold() {
long getReapThreshold() {
final long threshold;
// if Claim time is greater than reap threshold
if (config.getClaimedTime().getMillis() >= config.getReapThreshold().getMillis()) {
// override reap threshold using claim time + 5 minutes
threshold = config.getClaimedTime().getMillis() + FIVE_MINUTES;
threshold = config.getClaimedTime().getMillis() + FIVE_MINUTES_IN_MSEC;
log.warn("{}: Reap threshold was mis-configured. Claim time [{}] is greater than reap threshold [{}]",
threadScheduledExecutorName, config.getClaimedTime().toString(), config.getReapThreshold().toString());

@@ -119,4 +130,15 @@ private long getReapThreshold() {

return threshold;
}

long getSchedulePeriod() {
// Undocumented (across bus/notificationQ)
final String reapSchedule = System.getProperty(REAPER_SCHEDULE_PROP);
if (reapSchedule != null) {
final TimeSpan tmp = new TimeSpan(reapSchedule);
return tmp.getMillis();
} else {
return THREE_MINUTES_IN_MSEC;
}
}
}
@@ -0,0 +1,49 @@
/*
* Copyright 2014-2019 Groupon, Inc
* Copyright 2014-2019 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.killbill.queue;

import org.killbill.bus.BusReaper;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestReaperConfig {

@Test(groups = "fast")
public void testDefaultReaperSchedule() {

final DefaultReaper reaper = new BusReaper(null, null, null);

final long defaultScheduleMilliSec = reaper.getSchedulePeriod();
Assert.assertEquals(defaultScheduleMilliSec, DefaultReaper.THREE_MINUTES_IN_MSEC);
}

@Test(groups = "fast")
public void testDefaultOverloadedReaperSchedule() {

final DefaultReaper reaper = new BusReaper(null, null, null);

try {
System.setProperty(DefaultReaper.REAPER_SCHEDULE_PROP, "7m"); // 420000 mSec
final long defaultScheduleMilliSec = reaper.getSchedulePeriod();
Assert.assertEquals(defaultScheduleMilliSec, 420000);
} finally {
System.clearProperty(DefaultReaper.REAPER_SCHEDULE_PROP);
}
}

}

0 comments on commit 084f527

Please sign in to comment.
You can’t perform that action at this time.