Skip to content

Commit

Permalink
GH-3271: Close session on error in stream source
Browse files Browse the repository at this point in the history
Fixes #3271

When exception happens at `.withPayload(session.readRaw(remotePath))`
in the `AbstractRemoteFileStreamingMessageSource` we don't close session.
The resource leaking happens in the caching session factory

* Add `session.close();` into the `catch (IOException e) {`
in the `AbstractRemoteFileStreamingMessageSource.doReceive()`
to clean up resources properly

**Cherry-pick to 5.2.x, 5.1.x & 4.3.x**
  • Loading branch information
artembilan authored and garyrussell committed May 11, 2020
1 parent af750e7 commit 37a0c40
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ protected Object doReceive() {
this.fileInfoJson ? file.toJson() : file);
}
catch (IOException e) {
session.close();
throw new UncheckedIOException("IOException when retrieving " + remotePath, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,26 @@
package org.springframework.integration.file.remote;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.file.filters.FileListFilter;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;

/**
* @author Lukas Gemela
Expand Down Expand Up @@ -69,6 +73,32 @@ public void filterOutFilesNotAcceptedByFilter() throws IOException {
assertThat(testRemoteFileStreamingMessageSource.doReceive()).isNull();
}

@Test
@SuppressWarnings("unchecked")
public void sessionReturnedToCacheProperlyOnDoReceive() throws IOException {
Session<String> session = mock(Session.class);
when(session.readRaw(anyString())).thenThrow(IOException.class);
when(session.list("remoteDirectory")).thenReturn(new String[] { "file1" });

SessionFactory<String> sessionFactory = mock(SessionFactory.class);
when(sessionFactory.getSession()).thenReturn(session);

CachingSessionFactory<String> cachingSessionFactory = new CachingSessionFactory<>(sessionFactory, 1);
RemoteFileTemplate<String> remoteFileTemplate = new RemoteFileTemplate<>(cachingSessionFactory);

TestRemoteFileStreamingMessageSource testRemoteFileStreamingMessageSource =
new TestRemoteFileStreamingMessageSource(remoteFileTemplate, null);

testRemoteFileStreamingMessageSource.setRemoteDirectory("remoteDirectory");
testRemoteFileStreamingMessageSource.setBeanFactory(mock(BeanFactory.class));
testRemoteFileStreamingMessageSource.start();

assertThatExceptionOfType(UncheckedIOException.class)
.isThrownBy(testRemoteFileStreamingMessageSource::doReceive);

assertThat(cachingSessionFactory.getSession()).isNotNull();
}

static class TestRemoteFileStreamingMessageSource extends AbstractRemoteFileStreamingMessageSource<String> {

TestRemoteFileStreamingMessageSource(RemoteFileTemplate<String> template, Comparator<String> comparator) {
Expand Down

0 comments on commit 37a0c40

Please sign in to comment.