Skip to content

Commit

Permalink
Fix CachedSessionFactory Race
Browse files Browse the repository at this point in the history
Close the pool so that any sessions returned after the factory is
`destroy()`ed are closed.

* Call `removeAllIdleItems()` in `close()`.

* Close sessions in `SftpStreamingMessageSourceTests`.

**cherry-pick to all supported branches**

# Conflicts:
#	spring-integration-core/src/test/java/org/springframework/integration/util/SimplePoolTests.java

# Conflicts:
#	spring-integration-core/src/test/java/org/springframework/integration/util/SimplePoolTests.java
#	spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java
#	spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java

* Remove `default close()` from `Pool` interface
* Cast to `SimplePool` in the `CSF` to call the new `close()`
  • Loading branch information
garyrussell authored and artembilan committed Jul 8, 2020
1 parent a24f24f commit c95e6dc
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 6 deletions.
Expand Up @@ -60,6 +60,8 @@ public class SimplePool<T> implements Pool<T> {

private final PoolItemCallback<T> callback;

private volatile boolean closed;

/**
* Creates a SimplePool with a specific limit.
* @param poolSize The maximum number of items the pool supports.
Expand Down Expand Up @@ -154,6 +156,7 @@ public void setWaitTimeout(long waitTimeout) {
* @throws MessagingException if no items become available in time.
*/
public T getItem() {
Assert.state(!this.closed, "Pool has been closed");
boolean permitted = false;
try {
try {
Expand Down Expand Up @@ -210,7 +213,7 @@ public synchronized void releaseItem(T item) {
Assert.isTrue(this.allocated.contains(item),
"You can only release items that were obtained from the pool");
if (this.inUse.contains(item)) {
if (this.poolSize.get() > this.targetPoolSize.get()) {
if (this.poolSize.get() > this.targetPoolSize.get() || this.closed) {
this.poolSize.decrementAndGet();
if (item != null) {
doRemoveItem(item);
Expand Down Expand Up @@ -250,6 +253,11 @@ private void doRemoveItem(T item) {
this.callback.removedFromPool(item);
}

public synchronized void close() {
this.closed = true;
removeAllIdleItems();
}

/**
* User of the pool provide an implementation of this interface; called during
* various pool operations.
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,17 +16,20 @@

package org.springframework.integration.util;

import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;


import org.junit.Test;

import org.springframework.integration.test.util.TestUtils;
Expand Down Expand Up @@ -125,13 +128,18 @@ public void testOverCommitandResize() {
assertEquals(2, pool.getAllocatedCount());
}

@Test(expected = IllegalArgumentException.class)
@Test
public void testForeignObject() {
final Set<String> strings = new HashSet<String>();
final AtomicBoolean stale = new AtomicBoolean();
SimplePool<String> pool = stringPool(2, strings, stale);
pool.getItem();
pool.releaseItem("Hello, world!");
try {
pool.releaseItem("Hello, world!");
}
catch (Exception e) {
assertThat(e, instanceOf(IllegalArgumentException.class));
}
}

@Test
Expand All @@ -150,8 +158,27 @@ public void testDoubleReturn() {
}


@Test
public void testClose() {
SimplePool<String> pool = stringPool(10, new HashSet<>(), new AtomicBoolean());
String item1 = pool.getItem();
String item2 = pool.getItem();
pool.releaseItem(item2);
assertEquals(2, pool.getAllocatedCount());
pool.close();
pool.releaseItem(item1);
assertEquals(0, pool.getAllocatedCount());
try {
pool.getItem();
}
catch (Exception e) {
assertThat(e, instanceOf(IllegalStateException.class));
}
}

private SimplePool<String> stringPool(int size, final Set<String> strings,
final AtomicBoolean stale) {

SimplePool<String> pool = new SimplePool<String>(size, new SimplePool.PoolItemCallback<String>() {
private int i;
public String createForPool() {
Expand All @@ -168,6 +195,7 @@ public boolean isStale(String item) {
public void removedFromPool(String item) {
strings.remove(item);
}

});
return pool;
}
Expand Down
Expand Up @@ -128,7 +128,7 @@ public Session<F> getSession() {
*/
@Override
public void destroy() {
this.pool.removeAllIdleItems();
this.pool.close();
}

/**
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
import org.springframework.integration.ip.IpHeaders;
Expand All @@ -40,7 +41,7 @@
* @since 2.2
*
*/
public class CachingClientConnectionFactory extends AbstractClientConnectionFactory {
public class CachingClientConnectionFactory extends AbstractClientConnectionFactory implements DisposableBean {

private final AbstractClientConnectionFactory targetConnectionFactory;

Expand Down Expand Up @@ -403,6 +404,15 @@ public void stop(Runnable callback) {
this.targetConnectionFactory.stop(callback);
}

@Override
public void destroy() throws Exception {
if (this.pool instanceof SimplePool) {
((SimplePool) this.pool).close();
}
}



private final class CachedConnection extends TcpConnectionInterceptorSupport {

private final AtomicBoolean released = new AtomicBoolean();
Expand Down

0 comments on commit c95e6dc

Please sign in to comment.