Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue perf optimization #65

Merged
merged 3 commits into from May 6, 2019
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -4,4 +4,4 @@
<option name="USE_PROJECT_PROFILE" value="true" />
<version value="1.0" />
</settings>
</component>
</component>
@@ -104,11 +104,11 @@ public EventBusDelegate(final String busName) {

@Inject
public DefaultPersistentBus(@Named(QUEUE_NAME) final IDBI dbi, final Clock clock, final PersistentBusConfig config, final MetricRegistry metricRegistry, final DatabaseTransactionNotificationApi databaseTransactionNotificationApi) {
super("Bus", config, metricRegistry);
super(config.getTableName(), config, metricRegistry);
this.dbi = (DBI) dbi;
this.clock = clock;
this.config = config;
this.dbBackedQId = "bus-" + config.getTableName();
this.dbBackedQId = config.getTableName();
this.dao = config.getPersistentQueueMode() == PersistentQueueMode.STICKY_EVENTS ?
new DBBackedQueueWithInflightQueue<BusEventModelDao>(clock, dbi, PersistentBusSqlDao.class, config, dbBackedQId, metricRegistry, databaseTransactionNotificationApi) :
new DBBackedQueueWithPolling<BusEventModelDao>(clock, dbi, PersistentBusSqlDao.class, config, dbBackedQId, metricRegistry);
@@ -123,7 +123,7 @@ public Thread newThread(final Runnable r) {
}
};

this.busHandlersProcessingTime = metricRegistry.timer(MetricRegistry.name(DefaultPersistentBus.class, "busHandlersProcessingTime"));
this.busHandlersProcessingTime = metricRegistry.timer(MetricRegistry.name(DefaultPersistentBus.class, dbBackedQId, "busHandlersProcessingTime"));

this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
this.isInitialized = new AtomicBoolean(false);
@@ -87,6 +87,18 @@
@Description("Max number of dispatch threads to use")
public abstract int geMaxDispatchThreads();

@Override
@Config("org.killbill.persistent.bus.${instanceName}.lifecycle.dispatch.nbThreads")
@Default("1")
@Description("Max number of lifecycle dispatch threads to use")
public abstract int geNbLifecycleDispatchThreads();

This comment has been minimized.

Copy link
@pierre

@Override
@Config("org.killbill.persistent.bus.${instanceName}.lifecycle.complete.nbThreads")
@Default("2")
@Description("Max number of lifecycle complete threads to use")
public abstract int geNbLifecycleCompleteThreads();

@Override
@Config("org.killbill.persistent.bus.${instanceName}.queue.capacity")
@Default("30000")
@@ -70,7 +70,6 @@
protected final DBBackedQueue<NotificationEventModelDao> dao;
protected final MetricRegistry metricRegistry;

private final Counter processedNotificationsSinceStart;
private final Map<String, Histogram> perQueueProcessingTime;

// We could event have one per queue is required...
@@ -102,11 +101,10 @@ public void uncaughtException(final Thread t, final Throwable e) {
this.clock = clock;
this.config = config;
this.nbProcessedEvents = new AtomicLong();
this.dao = new DBBackedQueueWithPolling<NotificationEventModelDao>(clock, dbi, NotificationSqlDao.class, config, "notif-" + config.getTableName(), metricRegistry);
this.dao = new DBBackedQueueWithPolling<NotificationEventModelDao>(clock, dbi, NotificationSqlDao.class, config, config.getTableName(), metricRegistry);

this.queues = new TreeMap<String, NotificationQueue>();

this.processedNotificationsSinceStart = metricRegistry.counter(MetricRegistry.name(NotificationQueueDispatcher.class, "processed-notifications-since-start"));
this.perQueueProcessingTime = new HashMap<String, Histogram>();

this.metricRegistry = metricRegistry;
@@ -225,7 +223,6 @@ public void handleNotificationWithMetrics(final NotificationQueueHandler handler
nbProcessedEvents.incrementAndGet();
// Unclear if those stats should include failures
perQueueHistogramProcessingTime.update(System.nanoTime() - beforeProcessing);
processedNotificationsSinceStart.inc();
}
}

@@ -87,6 +87,18 @@
@Description("Number of threads to use")
public abstract int geMaxDispatchThreads();

@Override
@Config("org.killbill.notificationq.${instanceName}.lifecycle.dispatch.nbThreads")
@Default("1")
@Description("Max number of lifecycle dispatch threads to use")
public abstract int geNbLifecycleDispatchThreads();

@Override
@Config("org.killbill.notificationq.${instanceName}.lifecycle.complete.nbThreads")
@Default("2")
@Description("Max number of lifecycle complete threads to use")
public abstract int geNbLifecycleCompleteThreads();

@Override
@Config("org.killbill.notificationq.${instanceName}.queue.capacity")
@Default("100")
@@ -104,12 +104,12 @@ public DBBackedQueue(final Clock clock,
this.clock = clock;
this.prof = new Profiling<Long, RuntimeException>();

this.rawGetEntriesTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, "rawGetEntriesTime"));
this.rawInsertEntryTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, "rawInsertEntryTime"));
this.rawClaimEntriesTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, "rawClaimEntriesTime"));
this.rawClaimEntryTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, "rawClaimEntryTime"));
this.rawDeleteEntriesTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, "rawDeleteEntriesTime"));
this.rawDeleteEntryTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, "rawDeleteEntryTime"));
this.rawGetEntriesTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, dbBackedQId, "rawGetEntriesTime"));
this.rawInsertEntryTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, dbBackedQId, "rawInsertEntryTime"));
this.rawClaimEntriesTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, dbBackedQId, "rawClaimEntriesTime"));
this.rawClaimEntryTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, dbBackedQId, "rawClaimEntryTime"));
this.rawDeleteEntriesTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, dbBackedQId, "rawDeleteEntriesTime"));
this.rawDeleteEntryTime = metricRegistry.timer(MetricRegistry.name(DBBackedQueue.class, dbBackedQId, "rawDeleteEntryTime"));

this.DB_QUEUE_LOG_ID = "DBBackedQueue-" + dbBackedQId;
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.