Skip to content

Commit

Permalink
#21 Add support for event notifications
Browse files Browse the repository at this point in the history
Add Event publishing for connection acquire and lease times
  • Loading branch information
vladmihalcea committed Jul 6, 2015
1 parent 7eb2f12 commit 95f4827
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 4 deletions.
Expand Up @@ -9,7 +9,10 @@
import com.vladmihalcea.flexypool.connection.ConnectionProxyFactory;
import com.vladmihalcea.flexypool.connection.ConnectionRequestContext;
import com.vladmihalcea.flexypool.connection.Credentials;
import com.vladmihalcea.flexypool.event.ConnectionAcquireTimeThresholdExceededEvent;
import com.vladmihalcea.flexypool.event.ConnectionLeaseTimeThresholdExceededEvent;
import com.vladmihalcea.flexypool.event.EventListenerResolver;
import com.vladmihalcea.flexypool.event.EventPublisher;
import com.vladmihalcea.flexypool.exception.AcquireTimeoutException;
import com.vladmihalcea.flexypool.exception.CantAcquireConnectionException;
import com.vladmihalcea.flexypool.lifecycle.LifeCycleCallback;
Expand Down Expand Up @@ -122,6 +125,8 @@ private Configuration<D> configuration(D dataSource) {
Boolean jmxEnabled = propertyLoader.isJmxEnabled();
Boolean jmxAutoStart = propertyLoader.isJmxAutoStart();
EventListenerResolver eventListenerResolver = propertyLoader.getEventListenerResolver();
Long connectionAcquireTimeThresholdMillis = propertyLoader.getConnectionAcquireTimeThresholdMillis();
Long connectionLeaseTimeThresholdMillis = propertyLoader.getConnectionLeaseTimeThresholdMillis();

if (poolAdapterFactory == null) {
poolAdapterFactory = (PoolAdapterFactory<D>) DataSourcePoolAdapter.FACTORY;
Expand All @@ -148,6 +153,12 @@ private Configuration<D> configuration(D dataSource) {
if (eventListenerResolver != null) {
configurationBuilder.setEventListenerResolver(eventListenerResolver);
}
if (connectionAcquireTimeThresholdMillis != null) {
configurationBuilder.setConnectionAcquireTimeThresholdMillis(connectionAcquireTimeThresholdMillis);
}
if (connectionLeaseTimeThresholdMillis != null) {
configurationBuilder.setConnectionLeaseTimeThresholdMillis(connectionLeaseTimeThresholdMillis);
}
return configurationBuilder.build();
}

Expand All @@ -166,6 +177,7 @@ public FlexyPoolDataSourceConfiguration<D> getFlexyPoolDataSourceConfiguration()
public static final String CONCURRENT_CONNECTION_REQUESTS_HISTOGRAM = "concurrentConnectionRequestsHistogram";
public static final String CONNECTION_LEASE_MILLIS = "connectionLeaseMillis";

private final String uniqueName;
private final PoolAdapter<T> poolAdapter;
private final T targetDataSource;
private final Metrics metrics;
Expand All @@ -180,6 +192,11 @@ public FlexyPoolDataSourceConfiguration<D> getFlexyPoolDataSourceConfiguration()
private AtomicLong concurrentConnectionCount = new AtomicLong();
private AtomicLong concurrentConnectionRequestCount = new AtomicLong();

private final EventPublisher eventPublisher;

private final long connectionAcquireTimeThresholdMillis;
private final long connectionLeaseTimeThresholdMillis;

/**
* Initialize <code>FlexyPoolDataSource</code> from {@link Configuration} and the array of {@link ConnectionAcquiringStrategyFactory}
*
Expand Down Expand Up @@ -216,6 +233,7 @@ public FlexyPoolDataSource(T targetDataSource) {
*/
private FlexyPoolDataSource(final Configuration<T> configuration,
List<ConnectionAcquiringStrategyFactory<? extends ConnectionAcquiringStrategy, T>> connectionAcquiringStrategyFactories) {
this.uniqueName = configuration.getUniqueName();
this.poolAdapter = configuration.getPoolAdapter();
this.targetDataSource = poolAdapter.getTargetDataSource();
this.metrics = configuration.getMetrics();
Expand All @@ -231,6 +249,9 @@ private FlexyPoolDataSource(final Configuration<T> configuration,
connectionAcquiringStrategyFactory : connectionAcquiringStrategyFactories) {
connectionAcquiringStrategies.add(connectionAcquiringStrategyFactory.newInstance(configuration));
}
eventPublisher = configuration.getEventPublisher();
connectionAcquireTimeThresholdMillis = configuration.getConnectionAcquireTimeThresholdMillis();
connectionLeaseTimeThresholdMillis = configuration.getConnectionLeaseTimeThresholdMillis();
}

/**
Expand Down Expand Up @@ -293,8 +314,16 @@ private Connection getConnection(ConnectionRequestContext context) throws SQLExc
}
} finally {
long endNanos = System.nanoTime();
connectionAcquireTotalTimer.update(TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos), TimeUnit.MILLISECONDS);
long acquireDurationMillis = TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos);
connectionAcquireTotalTimer.update(acquireDurationMillis, TimeUnit.MILLISECONDS);
concurrentConnectionRequestCountHistogram.update(concurrentConnectionRequestCount.decrementAndGet());
if(acquireDurationMillis > connectionAcquireTimeThresholdMillis) {
eventPublisher.publish(new ConnectionAcquireTimeThresholdExceededEvent(
uniqueName, connectionAcquireTimeThresholdMillis, acquireDurationMillis
));
LOGGER.info("Connection acquired in {} millis, while threshold is set to {} in {} FlexyPoolDataSource",
acquireDurationMillis, connectionAcquireTimeThresholdMillis, uniqueName);
}
}
}

Expand All @@ -312,7 +341,15 @@ public void acquireConnection() {
@Override
public void releaseConnection(long leaseDurationNanos) {
concurrentConnectionCountHistogram.update(concurrentConnectionCount.decrementAndGet());
connectionLeaseTimer.update(TimeUnit.NANOSECONDS.toMillis(leaseDurationNanos), TimeUnit.MILLISECONDS);
long leaseDurationMillis = TimeUnit.NANOSECONDS.toMillis(leaseDurationNanos);
connectionLeaseTimer.update(leaseDurationMillis, TimeUnit.MILLISECONDS);
if(leaseDurationMillis > connectionLeaseTimeThresholdMillis) {
eventPublisher.publish(new ConnectionLeaseTimeThresholdExceededEvent(
uniqueName, connectionLeaseTimeThresholdMillis, leaseDurationMillis
));
LOGGER.info("Connection leased for {} millis, while threshold is set to {} in {} FlexyPoolDataSource",
leaseDurationMillis, connectionLeaseTimeThresholdMillis, uniqueName);
}
}

/**
Expand Down
Expand Up @@ -23,6 +23,10 @@ public abstract class ConfigurationProperties<T extends DataSource, M, P> {

private long metricLogReporterMillis;

private long connectionAcquireTimeThresholdMillis = Long.MAX_VALUE;

private long connectionLeaseTimeThresholdMillis = Long.MAX_VALUE;

public ConfigurationProperties(String uniqueName, EventPublisher eventPublisher) {
this.uniqueName = uniqueName;
this.eventPublisher = eventPublisher;
Expand Down Expand Up @@ -99,6 +103,38 @@ protected void setMetricLogReporterMillis(long metricLogReporterMillis) {
this.metricLogReporterMillis = metricLogReporterMillis;
}

/**
* Get the connection acquire time threshold millis
* @return connection acquire time threshold millis
*/
public long getConnectionAcquireTimeThresholdMillis() {
return connectionAcquireTimeThresholdMillis;
}

/**
* Set the connection acquire time threshold millis
* @param connectionAcquireTimeThresholdMillis connection acquire time threshold millis
*/
public void setConnectionAcquireTimeThresholdMillis(long connectionAcquireTimeThresholdMillis) {
this.connectionAcquireTimeThresholdMillis = connectionAcquireTimeThresholdMillis;
}

/**
* Get the connection lease time threshold millis
* @return connection lease time threshold millis
*/
public long getConnectionLeaseTimeThresholdMillis() {
return connectionLeaseTimeThresholdMillis;
}

/**
* Set the connection lease time threshold millis
* @param connectionLeaseTimeThresholdMillis connection lease time threshold millis
*/
public void setConnectionLeaseTimeThresholdMillis(long connectionLeaseTimeThresholdMillis) {
this.connectionLeaseTimeThresholdMillis = connectionLeaseTimeThresholdMillis;
}

/**
* Get the target data source
*
Expand Down
Expand Up @@ -41,6 +41,8 @@ public static class Builder<T extends DataSource> {
private boolean jmxAutoStart = false;
private long metricLogReporterMillis = DEFAULT_METRIC_LOG_REPORTER_MILLIS;
private EventListenerResolver eventListenerResolver;
private long connectionAcquireTimeThresholdMillis = Long.MAX_VALUE;
private long connectionLeaseTimeThresholdMillis = Long.MAX_VALUE;

/**
* Construct the builder with the mandatory associations.
Expand Down Expand Up @@ -120,6 +122,30 @@ public Builder<T> setEventListenerResolver(EventListenerResolver eventListenerRe
return this;
}

/**
* Set the connection acquire time threshold millis
* @param connectionAcquireTimeThresholdMillis connection acquire time threshold millis
* @return this {@link com.vladmihalcea.flexypool.config.Configuration.Builder}
*/
public Builder<T> setConnectionAcquireTimeThresholdMillis(Long connectionAcquireTimeThresholdMillis) {
if (connectionAcquireTimeThresholdMillis != null) {
this.connectionAcquireTimeThresholdMillis = connectionAcquireTimeThresholdMillis;
}
return this;
}

/**
* Set the connection lease time threshold millis
* @param connectionLeaseTimeThresholdMillis connection lease time threshold millis
* @return this {@link com.vladmihalcea.flexypool.config.Configuration.Builder}
*/
public Builder<T> setConnectionLeaseTimeThresholdMillis(Long connectionLeaseTimeThresholdMillis) {
if (connectionLeaseTimeThresholdMillis != null) {
this.connectionLeaseTimeThresholdMillis = connectionLeaseTimeThresholdMillis;
}
return this;
}

/**
* Build the configuration object.
*
Expand All @@ -131,6 +157,8 @@ public Configuration<T> build() {
configuration.setJmxEnabled(jmxEnabled);
configuration.setJmxAutoStart(jmxAutoStart);
configuration.setMetricLogReporterMillis(metricLogReporterMillis);
configuration.setConnectionAcquireTimeThresholdMillis(connectionAcquireTimeThresholdMillis);
configuration.setConnectionLeaseTimeThresholdMillis(connectionLeaseTimeThresholdMillis);
configuration.metrics = metricsFactory.newInstance(configuration);
configuration.poolAdapter = poolAdapterFactory.newInstance(configuration);
configuration.connectionProxyFactory = connectionProxyFactory;
Expand Down
Expand Up @@ -54,7 +54,9 @@ public enum PropertyKey {
POOL_METRICS_REPORTER_JMX_ENABLE("flexy.pool.metrics.reporter.jmx.enable"),
POOL_METRICS_REPORTER_JMX_AUTO_START("flexy.pool.metrics.reporter.jmx.auto.start"),
POOL_STRATEGIES_FACTORY_RESOLVER("flexy.pool.strategies.factory.resolver"),
POOL_EVENT_LISTENER_RESOLVER("flexy.pool.event.listener.resolver");
POOL_EVENT_LISTENER_RESOLVER("flexy.pool.event.listener.resolver"),
POOL_TIME_THRESHOLD_CONNECTION_ACQUIRE("flexy.pool.time.threshold.connection.acquire"),
POOL_TIME_THRESHOLD_CONNECTION_LEASE("flexy.pool.time.threshold.connection.lease"),;

private final String key;

Expand Down Expand Up @@ -255,6 +257,22 @@ public EventListenerResolver getEventListenerResolver() {
return instantiateClass(PropertyKey.POOL_EVENT_LISTENER_RESOLVER);
}

/**
* Get the connection acquire time threshold millis
* @return connection acquire time threshold millis
*/
public Long getConnectionAcquireTimeThresholdMillis() {
return longProperty(PropertyKey.POOL_TIME_THRESHOLD_CONNECTION_ACQUIRE);
}

/**
* Get the connection lease time threshold millis
* @return connection lease time threshold millis
*/
public Long getConnectionLeaseTimeThresholdMillis() {
return longProperty(PropertyKey.POOL_TIME_THRESHOLD_CONNECTION_LEASE);
}

/**
* Instantiate class associated to the given property key
*
Expand Down Expand Up @@ -296,6 +314,21 @@ private Integer integerProperty(PropertyKey propertyKey) {
return value;
}

/**
* Get Long property value
*
* @param propertyKey property key
* @return Long property value
*/
private Long longProperty(PropertyKey propertyKey) {
Long value = null;
String property = properties.getProperty(propertyKey.getKey());
if (property != null) {
value = Long.valueOf(property);
}
return value;
}

/**
* Get Boolean property value
*
Expand Down
@@ -0,0 +1,19 @@
package com.vladmihalcea.flexypool.event;

/**
* <code>ConnectionAcquireTimeThresholdExceededEvent</code> - Event generated when a connection acquire has exceeded the given time threshold
*
* @author Vlad Mihalcea
*/
public class ConnectionAcquireTimeThresholdExceededEvent extends TimeThresholdExceededEvent {

private static final long serialVersionUID = -2107982228572130887L;

/**
* {@inheritDoc}
*/
public ConnectionAcquireTimeThresholdExceededEvent(String uniqueName,
long timeThresholdMillis, long actualTimeMillis) {
super(uniqueName, timeThresholdMillis, actualTimeMillis);
}
}
@@ -0,0 +1,19 @@
package com.vladmihalcea.flexypool.event;

/**
* <code>ConnectionLeaseTimeThresholdExceededEvent</code> - Event generated when a connection lease has exceeded the given time threshold
*
* @author Vlad Mihalcea
*/
public class ConnectionLeaseTimeThresholdExceededEvent extends TimeThresholdExceededEvent {

private static final long serialVersionUID = -2107982228572130887L;

/**
* {@inheritDoc}
*/
public ConnectionLeaseTimeThresholdExceededEvent(String uniqueName,
long timeThresholdMillis, long actualTimeMillis) {
super(uniqueName, timeThresholdMillis, actualTimeMillis);
}
}
@@ -0,0 +1,35 @@
package com.vladmihalcea.flexypool.event;

/**
* <code>TimeThresholdExceededEvent</code> - Event generated when a time threshold is exceeded
*
* @author Vlad Mihalcea
*/
public abstract class TimeThresholdExceededEvent extends Event {

private static final long serialVersionUID = 8983594872506186227L;

private final long timeThresholdMillis;

private final long actualTimeMillis;

/**
* Init constructor
* @param uniqueName FlexyPool unique name
* @param timeThresholdMillis time threshold millis
* @param actualTimeMillis actual time millis
*/
public TimeThresholdExceededEvent(String uniqueName, long timeThresholdMillis, long actualTimeMillis) {
super(uniqueName);
this.timeThresholdMillis = timeThresholdMillis;
this.actualTimeMillis = actualTimeMillis;
}

public long getTimeThresholdMillis() {
return timeThresholdMillis;
}

public long getActualTimeMillis() {
return actualTimeMillis;
}
}
Expand Up @@ -91,7 +91,8 @@ public static Method getMethod(Object target, String methodName, Class... parame
*/
public static boolean hasMethod(Class<?> targetClass, String methodName, Class... parameterTypes) {
try {
return targetClass.getMethod(methodName, parameterTypes) != null;
targetClass.getMethod(methodName, parameterTypes);
return true;
} catch (NoSuchMethodException e) {
return false;
}
Expand Down
Expand Up @@ -334,6 +334,8 @@ public void testDefaultConstructorWithExistingJNDIDataSource() throws SQLExcepti
Properties properties = new Properties();
properties.put(PropertyLoader.PropertyKey.DATA_SOURCE_UNIQUE_NAME.getKey(), "jdbc/DS");
properties.put(PropertyLoader.PropertyKey.DATA_SOURCE_CLASS_NAME.getKey(), MockDataSource.class.getName());
properties.put(PropertyLoader.PropertyKey.POOL_TIME_THRESHOLD_CONNECTION_ACQUIRE.getKey(), "-1");
properties.put(PropertyLoader.PropertyKey.POOL_TIME_THRESHOLD_CONNECTION_LEASE.getKey(), "-1");
PropertiesTestUtils.setProperties(properties);
FlexyPoolDataSource<DataSource> flexyPoolDataSource = new FlexyPoolDataSource<DataSource>();
DataSource dataSource = ((PoolAdapter) ReflectionTestUtils.getField(flexyPoolDataSource, "poolAdapter")).getTargetDataSource();
Expand Down
Expand Up @@ -150,6 +150,8 @@ public void testLoadPropertiesWhenFileExistsAndContainsAllConfigs() {
properties.put(PropertyLoader.PropertyKey.POOL_CONNECTION_PROXY_FACTORY.getKey(), MockConnectionProxyFactory.class.getName());
properties.put(PropertyLoader.PropertyKey.POOL_STRATEGIES_FACTORY_RESOLVER.getKey(), MockConnectionAcquiringStrategyFactoryResolver.class.getName());
properties.put(PropertyLoader.PropertyKey.POOL_EVENT_LISTENER_RESOLVER.getKey(), MockEventListenerResolver.class.getName());
properties.put(PropertyLoader.PropertyKey.POOL_TIME_THRESHOLD_CONNECTION_ACQUIRE.getKey(), "120");
properties.put(PropertyLoader.PropertyKey.POOL_TIME_THRESHOLD_CONNECTION_LEASE.getKey(), "240");
PropertiesTestUtils.setProperties(properties);
PropertyLoader propertyLoader = new PropertyLoader();
assertNotNull(propertyLoader.getDataSource());
Expand All @@ -167,6 +169,8 @@ public void testLoadPropertiesWhenFileExistsAndContainsAllConfigs() {
assertNotNull(propertyLoader.getPoolAdapterFactory());
assertEquals(1, propertyLoader.getConnectionAcquiringStrategyFactories().size());
assertEquals(MockEventListenerResolver.MockEventListener.class, propertyLoader.getEventListenerResolver().resolveListeners().get(0).getClass());
assertEquals(Long.valueOf(120), propertyLoader.getConnectionAcquireTimeThresholdMillis());
assertEquals(Long.valueOf(240), propertyLoader.getConnectionLeaseTimeThresholdMillis());
} catch (IOException e) {
fail("Can't save/load properties");
}
Expand Down

0 comments on commit 95f4827

Please sign in to comment.