Skip to content

Commit

Permalink
util: introduce DBRouter
Browse files Browse the repository at this point in the history
Encapsulate the logic to route between RO and RW DB instance.
That way, we always honor the perThreadDirtyDBFlag.

Signed-off-by: Pierre-Alexandre Meyer <pierre@mouraf.org>
  • Loading branch information
pierre committed Apr 25, 2018
1 parent fc08c9e commit da16bff
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 118 deletions.
Expand Up @@ -51,7 +51,7 @@ public void beforeMethod() throws Exception {
super.beforeMethod();

// Create the tenant
final DefaultTenantDao tenantDao = new DefaultTenantDao(dbi, roDbi, clock, cacheControllerDispatcher, new DefaultNonEntityDao(dbi), Mockito.mock(InternalCallContextFactory.class), securityConfig);
final DefaultTenantDao tenantDao = new DefaultTenantDao(dbi, roDbi, clock, cacheControllerDispatcher, new DefaultNonEntityDao(dbi, roDbi), Mockito.mock(InternalCallContextFactory.class), securityConfig);
tenant = new DefaultTenant(UUID.randomUUID(), null, null, UUID.randomUUID().toString(),
UUID.randomUUID().toString(), UUID.randomUUID().toString());
tenantDao.create(new TenantModelDao(tenant), internalCallContext);
Expand Down
Expand Up @@ -27,43 +27,42 @@
import org.joda.time.LocalDate;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.callcontext.InternalTenantContext;
import org.killbill.billing.util.entity.dao.DBRouter;
import org.skife.jdbi.v2.IDBI;

import static org.killbill.billing.util.glue.IDBISetup.MAIN_RO_IDBI_NAMED;

public class DefaultRolledUpUsageDao implements RolledUpUsageDao {

private final RolledUpUsageSqlDao rolledUpUsageSqlDao;
private final RolledUpUsageSqlDao roRolledUpUsageSqlDao;
private final DBRouter<RolledUpUsageSqlDao> dbRouter;

@Inject
public DefaultRolledUpUsageDao(final IDBI dbi, @Named(MAIN_RO_IDBI_NAMED) final IDBI roDbi) {
this.rolledUpUsageSqlDao = dbi.onDemand(RolledUpUsageSqlDao.class);
this.roRolledUpUsageSqlDao = roDbi.onDemand(RolledUpUsageSqlDao.class);
this.dbRouter = new DBRouter<RolledUpUsageSqlDao>(dbi, roDbi, RolledUpUsageSqlDao.class);
}

@Override
public void record(final Iterable<RolledUpUsageModelDao> usages, final InternalCallContext context) {
rolledUpUsageSqlDao.create(usages, context);
dbRouter.onDemand(false).create(usages, context);
}

@Override
public Boolean recordsWithTrackingIdExist(final UUID subscriptionId, final String trackingId, final InternalTenantContext context) {
return rolledUpUsageSqlDao.recordsWithTrackingIdExist(subscriptionId, trackingId, context) != null;
return dbRouter.onDemand(false).recordsWithTrackingIdExist(subscriptionId, trackingId, context) != null;
}

@Override
public List<RolledUpUsageModelDao> getUsageForSubscription(final UUID subscriptionId, final LocalDate startDate, final LocalDate endDate, final String unitType, final InternalTenantContext context) {
return roRolledUpUsageSqlDao.getUsageForSubscription(subscriptionId, startDate.toDate(), endDate.toDate(), unitType, context);
return dbRouter.onDemand(true).getUsageForSubscription(subscriptionId, startDate.toDate(), endDate.toDate(), unitType, context);
}

@Override
public List<RolledUpUsageModelDao> getAllUsageForSubscription(final UUID subscriptionId, final LocalDate startDate, final LocalDate endDate, final InternalTenantContext context) {
return roRolledUpUsageSqlDao.getAllUsageForSubscription(subscriptionId, startDate.toDate(), endDate.toDate(), context);
return dbRouter.onDemand(true).getAllUsageForSubscription(subscriptionId, startDate.toDate(), endDate.toDate(), context);
}

@Override
public List<RolledUpUsageModelDao> getRawUsageForAccount(final LocalDate startDate, final LocalDate endDate, final InternalTenantContext context) {
return roRolledUpUsageSqlDao.getRawUsageForAccount(startDate.toDate(), endDate.toDate(), context);
return dbRouter.onDemand(true).getRawUsageForAccount(startDate.toDate(), endDate.toDate(), context);
}
}

Large diffs are not rendered by default.

Expand Up @@ -22,6 +22,7 @@
import javax.inject.Inject;
import javax.inject.Named;

import org.killbill.billing.util.entity.dao.DBRouter;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.TransactionCallback;
Expand All @@ -31,18 +32,16 @@

public class DefaultBroadcastDao implements BroadcastDao {

private final IDBI dbi;
private final IDBI roDbi;
private final DBRouter<BroadcastSqlDao> dbRouter;

@Inject
public DefaultBroadcastDao(final IDBI dbi, @Named(MAIN_RO_IDBI_NAMED) final IDBI roDbi) {
this.dbi = dbi;
this.roDbi = roDbi;
this.dbRouter = new DBRouter<BroadcastSqlDao>(dbi, roDbi, BroadcastSqlDao.class);
}

@Override
public void create(final BroadcastModelDao broadcastModelDao) {
dbi.inTransaction(new TransactionCallback<Void>() {
dbRouter.inTransaction(false, new TransactionCallback<Void>() {
@Override
public Void inTransaction(final Handle handle, final TransactionStatus status) throws Exception {
final BroadcastSqlDao sqlDao = handle.attach(BroadcastSqlDao.class);
Expand All @@ -54,7 +53,7 @@ public Void inTransaction(final Handle handle, final TransactionStatus status) t

@Override
public List<BroadcastModelDao> getLatestEntriesFrom(final Long recordId) {
return roDbi.inTransaction(new TransactionCallback<List<BroadcastModelDao>>() {
return dbRouter.inTransaction(true, new TransactionCallback<List<BroadcastModelDao>>() {
@Override
public List<BroadcastModelDao> inTransaction(final Handle handle, final TransactionStatus status) throws Exception {
final BroadcastSqlDao sqlDao = handle.attach(BroadcastSqlDao.class);
Expand All @@ -65,7 +64,7 @@ public List<BroadcastModelDao> inTransaction(final Handle handle, final Transact

@Override
public BroadcastModelDao getLatestEntry() {
return roDbi.inTransaction(new TransactionCallback<BroadcastModelDao>() {
return dbRouter.inTransaction(true, new TransactionCallback<BroadcastModelDao>() {
@Override
public BroadcastModelDao inTransaction(final Handle handle, final TransactionStatus status) throws Exception {
final BroadcastSqlDao sqlDao = handle.attach(BroadcastSqlDao.class);
Expand Down
Expand Up @@ -28,19 +28,20 @@
import org.killbill.billing.util.audit.dao.AuditLogModelDao;
import org.killbill.billing.util.cache.Cachable.CacheType;
import org.killbill.billing.util.dao.AuditSqlDao;
import org.killbill.billing.util.entity.dao.DBRouter;
import org.skife.jdbi.v2.IDBI;

import static org.killbill.billing.util.glue.IDBISetup.MAIN_RO_IDBI_NAMED;

@Singleton
public class AuditLogCacheLoader extends BaseCacheLoader<String, List<AuditLogModelDao>> {

private final AuditSqlDao roAuditSqlDao;
private final DBRouter<AuditSqlDao> dbRouter;

@Inject
public AuditLogCacheLoader(@Named(MAIN_RO_IDBI_NAMED) final IDBI roDbi) {
public AuditLogCacheLoader(final IDBI dbi, @Named(MAIN_RO_IDBI_NAMED) final IDBI roDbi) {
super();
this.roAuditSqlDao = roDbi.onDemand(AuditSqlDao.class);
this.dbRouter = new DBRouter<AuditSqlDao>(dbi, roDbi, AuditSqlDao.class);
}

@Override
Expand All @@ -55,6 +56,6 @@ public List<AuditLogModelDao> compute(final String key, final CacheLoaderArgumen
final Long targetRecordId = (Long) args[1];
final InternalTenantContext internalTenantContext = (InternalTenantContext) args[2];

return roAuditSqlDao.getAuditLogsForTargetRecordId(tableName, targetRecordId, internalTenantContext);
return dbRouter.onDemand(true).getAuditLogsForTargetRecordId(tableName, targetRecordId, internalTenantContext);
}
}
Expand Up @@ -29,6 +29,7 @@
import org.killbill.billing.util.cache.CacheControllerDispatcher;
import org.killbill.billing.util.cache.CacheLoaderArgument;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.entity.dao.DBRouter;
import org.killbill.commons.profiling.Profiling;
import org.killbill.commons.profiling.Profiling.WithProfilingCallback;
import org.killbill.commons.profiling.ProfilingFeature.ProfilingFeatureType;
Expand All @@ -42,13 +43,13 @@

public class DefaultNonEntityDao implements NonEntityDao {

private final NonEntitySqlDao roNonEntitySqlDao;
private final DBRouter<NonEntitySqlDao> dbRouter;
private final WithCaching<String, Long> withCachingObjectId;
private final WithCaching<String, UUID> withCachingRecordId;

@Inject
public DefaultNonEntityDao(@Named(MAIN_RO_IDBI_NAMED) final IDBI roDbi) {
this.roNonEntitySqlDao = roDbi.onDemand(NonEntitySqlDao.class);
public DefaultNonEntityDao(final IDBI dbi, @Named(MAIN_RO_IDBI_NAMED) final IDBI roDbi) {
this.dbRouter = new DBRouter<NonEntitySqlDao>(dbi, roDbi, NonEntitySqlDao.class);
this.withCachingObjectId = new WithCaching<String, Long>();
this.withCachingRecordId = new WithCaching<String, UUID>();
}
Expand All @@ -69,7 +70,7 @@ public Long retrieveRecordIdFromObjectInTransaction(@Nullable final UUID objectI
return withCachingObjectId.withCaching(new OperationRetrieval<Long>() {
@Override
public Long doRetrieve(final ObjectType objectType) {
final NonEntitySqlDao inTransactionNonEntitySqlDao = handle == null ? roNonEntitySqlDao : SqlObjectBuilder.attach(handle, NonEntitySqlDao.class);
final NonEntitySqlDao inTransactionNonEntitySqlDao = handle == null ? dbRouter.onDemand(true) : SqlObjectBuilder.attach(handle, NonEntitySqlDao.class);
return inTransactionNonEntitySqlDao.getRecordIdFromObject(objectId.toString(), tableName.getTableName());
}
}, objectId.toString(), objectType, tableName, cache);
Expand All @@ -89,7 +90,7 @@ public Long retrieveAccountRecordIdFromObjectInTransaction(@Nullable final UUID
return withCachingObjectId.withCaching(new OperationRetrieval<Long>() {
@Override
public Long doRetrieve(final ObjectType objectType) {
final NonEntitySqlDao inTransactionNonEntitySqlDao = handle == null ? roNonEntitySqlDao : SqlObjectBuilder.attach(handle, NonEntitySqlDao.class);
final NonEntitySqlDao inTransactionNonEntitySqlDao = handle == null ? dbRouter.onDemand(true) : SqlObjectBuilder.attach(handle, NonEntitySqlDao.class);

switch (tableName) {
case TENANT:
Expand Down Expand Up @@ -121,7 +122,7 @@ public Long retrieveTenantRecordIdFromObjectInTransaction(@Nullable final UUID o
return withCachingObjectId.withCaching(new OperationRetrieval<Long>() {
@Override
public Long doRetrieve(final ObjectType objectType) {
final NonEntitySqlDao inTransactionNonEntitySqlDao = handle == null ? roNonEntitySqlDao : SqlObjectBuilder.attach(handle, NonEntitySqlDao.class);
final NonEntitySqlDao inTransactionNonEntitySqlDao = handle == null ? dbRouter.onDemand(true) : SqlObjectBuilder.attach(handle, NonEntitySqlDao.class);

switch (tableName) {
case TENANT:
Expand Down Expand Up @@ -153,7 +154,7 @@ public UUID retrieveIdFromObjectInTransaction(final Long recordId, final ObjectT
return withCachingRecordId.withCaching(new OperationRetrieval<UUID>() {
@Override
public UUID doRetrieve(final ObjectType objectType) {
final NonEntitySqlDao inTransactionNonEntitySqlDao = handle == null ? roNonEntitySqlDao : SqlObjectBuilder.attach(handle, NonEntitySqlDao.class);
final NonEntitySqlDao inTransactionNonEntitySqlDao = handle == null ? dbRouter.onDemand(true) : SqlObjectBuilder.attach(handle, NonEntitySqlDao.class);
return inTransactionNonEntitySqlDao.getIdFromObject(recordId, tableName.getTableName());
}
}, String.valueOf(recordId), objectType, tableName, cache);
Expand All @@ -167,7 +168,7 @@ public Long retrieveLastHistoryRecordIdFromTransaction(@Nullable final Long targ

@Override
public Long retrieveHistoryTargetRecordId(@Nullable final Long recordId, final TableName tableName) {
return roNonEntitySqlDao.getHistoryTargetRecordId(recordId, tableName.getTableName());
return dbRouter.onDemand(true).getHistoryTargetRecordId(recordId, tableName.getTableName());
}

private interface OperationRetrieval<TypeOut> {
Expand Down
@@ -0,0 +1,40 @@
/*
* Copyright 2014-2018 Groupon, Inc
* Copyright 2014-2018 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.killbill.billing.util.entity.dao;

import org.skife.jdbi.v2.IDBI;

public class DBRouter<C> extends DBRouterUntyped {

private final C onDemand;
private final C roOnDemand;

public DBRouter(final IDBI dbi, final IDBI roDbi, final Class<C> sqlObjectType) {
super(dbi, roDbi);
this.onDemand = dbi.onDemand(sqlObjectType);
this.roOnDemand = roDbi.onDemand(sqlObjectType);
}

public C onDemand(final boolean requestedRO) {
if (shouldUseRODBI(requestedRO)) {
return roOnDemand;
} else {
return onDemand;
}
}
}
@@ -0,0 +1,79 @@
/*
* Copyright 2014-2018 Groupon, Inc
* Copyright 2014-2018 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.killbill.billing.util.entity.dao;

import org.killbill.billing.util.glue.KillbillApiAopModule;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.TransactionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DBRouterUntyped {

private static final Logger logger = LoggerFactory.getLogger(DBRouterUntyped.class);

protected final IDBI dbi;
protected final IDBI roDbi;

public DBRouterUntyped(final IDBI dbi, final IDBI roDbi) {
this.dbi = dbi;
this.roDbi = roDbi;
}

public Handle getHandle(final boolean requestedRO) {
if (shouldUseRODBI(requestedRO)) {
return roDbi.open();
} else {
return dbi.open();
}
}

public <T> T onDemand(final boolean requestedRO, final Class<T> sqlObjectType) {
if (shouldUseRODBI(requestedRO)) {
return roDbi.onDemand(sqlObjectType);
} else {
return dbi.onDemand(sqlObjectType);
}
}

public <T> T inTransaction(final boolean requestedRO, final TransactionCallback<T> callback) {
if (shouldUseRODBI(requestedRO)) {
return roDbi.inTransaction(callback);
} else {
return dbi.inTransaction(callback);
}
}

boolean shouldUseRODBI(final boolean requestedRO) {
if (!requestedRO) {
KillbillApiAopModule.setDirtyDBFlag();
logger.debug("Dirty flag set, using RW DBI");
return false;
} else {
if (KillbillApiAopModule.getDirtyDBFlag()) {
// Redirect to the rw instance, to work-around any replication delay
logger.debug("RO DBI handle requested, but dirty flag set, using RW DBI");
return false;
} else {
logger.debug("Using RO DBI");
return true;
}
}
}
}

0 comments on commit da16bff

Please sign in to comment.