Skip to content

Commit

Permalink
MULE-8622: Implement Reliability Pattern for SFTP transport
Browse files Browse the repository at this point in the history
  • Loading branch information
marianogonzalez committed May 22, 2015
1 parent b3545a6 commit d179d79
Show file tree
Hide file tree
Showing 12 changed files with 642 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@
*/
package org.mule.transport.sftp;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mule.api.MuleRuntimeException;
import org.mule.util.FileUtils;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* Ensures that the file is moved to the archiveFile folder after a successful
* consumption of the file
*
* @author Magnus Larsson
*/
public class SftpFileArchiveInputStream extends FileInputStream implements ErrorOccurredDecorator
public class SftpFileArchiveInputStream extends FileInputStream implements SftpStream
{
/**
* logger used by this class
Expand All @@ -30,7 +33,10 @@ public class SftpFileArchiveInputStream extends FileInputStream implements Error

private File file;
private File archiveFile;
private final SftpInputStream remoteStream;
private boolean errorOccured = false;
private boolean postProcessOnClose = false;
private final AtomicBoolean closed = new AtomicBoolean(false);

// Log every 10 000 000 bytes read at debug-level
// Good if really large files are transferred and you tend to get nervous by not
Expand All @@ -39,19 +45,18 @@ public class SftpFileArchiveInputStream extends FileInputStream implements Error
private long bytesRead = 0;
private long nextLevelToLogBytesRead = LOG_BYTE_INTERVAL;

public SftpFileArchiveInputStream(File file) throws FileNotFoundException
public SftpFileArchiveInputStream(File file, SftpInputStream remoteStream) throws FileNotFoundException
{
super(file);

this.file = file;
this.archiveFile = null;
this.remoteStream = remoteStream;
}

public SftpFileArchiveInputStream(File file, File archiveFile) throws FileNotFoundException
public SftpFileArchiveInputStream(File file, File archiveFile, SftpInputStream sftpInputStream) throws FileNotFoundException
{
super(file);

this.file = file;
this(file, sftpInputStream);
this.archiveFile = archiveFile;
}

Expand All @@ -76,13 +81,51 @@ public int read(byte[] b) throws IOException
return super.read(b);
}

@Override
public void close() throws IOException
{
if (!closed.compareAndSet(false, true))
{
return;
}

if (logger.isDebugEnabled())
{
logger.debug("Closing the stream for the file " + file);
}
super.close();
remoteStream.close();

if (postProcessOnClose)
{
try
{
postProcess();
}
catch (Exception e)
{
throw new MuleRuntimeException(e);
}
}
}

@Override
public boolean isClosed()
{
return closed.get();
}

@Override
public void postProcess() throws Exception
{
if (!errorOccured)
{
remoteStream.postProcess();
}
else
{
remoteStream.releaseConnection();
}

if (!errorOccured && archiveFile != null)
{
Expand All @@ -95,12 +138,20 @@ public void close() throws IOException
}
}

@Override
public void setErrorOccurred()
{
if (logger.isDebugEnabled()) logger.debug("setErrorOccurred() called");
this.errorOccured = true;
}

@Override
public void performPostProcessingOnClose(boolean postProcessOnClose)
{
this.postProcessOnClose = postProcessOnClose;

}

private void logReadBytes(int newBytesRead)
{
if (!logger.isDebugEnabled()) return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,30 @@
*/
package org.mule.transport.sftp;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mule.api.MuleRuntimeException;
import org.mule.api.endpoint.ImmutableEndpoint;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* <code>SftpInputStream</code> wraps an sftp InputStream.
*/

public class SftpInputStream extends BufferedInputStream implements ErrorOccurredDecorator
public class SftpInputStream extends BufferedInputStream implements SftpStream
{
private final Log logger = LogFactory.getLog(getClass());

private SftpClient client;
private boolean autoDelete = true;
private String fileName;
private boolean postProcessOnClose = false;
private final AtomicBoolean closed = new AtomicBoolean(false);

public String getFileName()
{
Expand Down Expand Up @@ -93,38 +98,80 @@ public int read(byte[] b) throws IOException
return super.read(b);
}

@Override
public void close() throws IOException
{
if (logger.isDebugEnabled())
if (closed.compareAndSet(false, true))
{
logger.debug("Closing the stream for the file " + fileName);
if (logger.isDebugEnabled())
{
logger.debug("Closing the stream for the file " + fileName);
}

try
{
super.close();
}
catch (IOException e)
{
logger.error("Error occurred while closing file " + fileName, e);
throw e;
}

if (postProcessOnClose)
{
try
{
postProcess();
}
catch (Exception e)
{
throw new MuleRuntimeException(e);
}
}
}
try
}

@Override
public boolean isClosed()
{
return closed.get();
}

@Override
public void postProcess() throws Exception
{
if (!client.isConnected())
{
super.close();
return;
}

try
{
close();
if (autoDelete && !errorOccured)
{
client.deleteFile(fileName);
}
}
catch (IOException e)
{
logger.error("Error occured while closing file " + fileName, e);
throw e;
}
finally
{
// We should release the connection from the pool even if some error
// occurs here
try
{
((SftpConnector) endpoint.getConnector()).releaseClient(endpoint, client);
}
catch (Exception e)
{
logger.error(e.getMessage(), e);
}
releaseConnection();
}

}

void releaseConnection()
{
try
{
((SftpConnector) endpoint.getConnector()).releaseClient(endpoint, client);
}
catch (Exception e)
{
logger.error(e.getMessage(), e);
}
}

Expand All @@ -134,6 +181,12 @@ public void setErrorOccurred()
this.errorOccured = true;
}

@Override
public void performPostProcessingOnClose(boolean postProcessOnClose)
{
this.postProcessOnClose = postProcessOnClose;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.mule.transport.AbstractPollingMessageReceiver;
import org.mule.transport.ConnectException;
import org.mule.transport.sftp.notification.SftpNotifier;
import org.mule.util.ValueHolder;
import org.mule.util.lock.LockFactory;

import java.io.InputStream;
Expand Down Expand Up @@ -169,40 +170,82 @@ protected boolean pollOnPrimaryInstanceOnly()

protected void routeFile(final String path) throws Exception
{
final ValueHolder<InputStream> inputStreamReference = new ValueHolder<>();
ExecutionTemplate<MuleEvent> executionTemplate = createExecutionTemplate();
executionTemplate.execute(new ExecutionCallback<MuleEvent>()

try
{
@Override
public MuleEvent process() throws Exception
executionTemplate.execute(new ExecutionCallback<MuleEvent>()
{
// A bit tricky initialization of the notifier in this case since we don't
// have access to the message yet...
SftpNotifier notifier = new SftpNotifier((SftpConnector) connector, createNullMuleMessage(),
endpoint, flowConstruct.getName());
@Override
public MuleEvent process() throws Exception
{
// A bit tricky initialization of the notifier in this case since we don't
// have access to the message yet...
SftpNotifier notifier = new SftpNotifier((SftpConnector) connector, createNullMuleMessage(),
endpoint, flowConstruct.getName());

InputStream inputStream = sftpRRUtil.retrieveFile(path, notifier);
InputStream inputStream = sftpRRUtil.retrieveFile(path, notifier);
inputStreamReference.set(inputStream);

if (logger.isDebugEnabled())
{
logger.debug("Routing file: " + path);
}
if (logger.isDebugEnabled())
{
logger.debug("Routing file: " + path);
}

MuleMessage message = createMuleMessage(inputStream);
MuleMessage message = createMuleMessage(inputStream);

message.setProperty(SftpConnector.PROPERTY_FILENAME, path, PropertyScope.INBOUND);
message.setProperty(SftpConnector.PROPERTY_ORIGINAL_FILENAME, path, PropertyScope.INBOUND);
message.setProperty(SftpConnector.PROPERTY_FILENAME, path, PropertyScope.INBOUND);
message.setProperty(SftpConnector.PROPERTY_ORIGINAL_FILENAME, path, PropertyScope.INBOUND);

// Now we have access to the message, update the notifier with the message
notifier.setMessage(message);
routeMessage(message);
// Now we have access to the message, update the notifier with the message
notifier.setMessage(message);
routeMessage(message);

if (logger.isDebugEnabled())
if (logger.isDebugEnabled())
{
logger.debug("Routed file: " + path);
}
return null;
}
});

SftpStream sftpStream = getSftpStream(inputStreamReference);
if (sftpStream != null)
{
sftpStream.performPostProcessingOnClose(true);
}
}
catch (Exception e)
{
SftpStream sftpStream = getSftpStream(inputStreamReference);
if (sftpStream != null)
{
sftpStream.setErrorOccurred();
}
}
finally
{
SftpStream sftpStream = getSftpStream(inputStreamReference);
if (sftpStream != null)
{
if (sftpStream.isClosed())
{
logger.debug("Routed file: " + path);
sftpStream.postProcess();
}
return null;
}
});
}
}

private SftpStream getSftpStream(ValueHolder<InputStream> inputStreamReference)
{
InputStream inputStream = inputStreamReference.get();
if (inputStream instanceof SftpStream)
{
return (SftpStream) inputStream;
}

return null;
}

/**
Expand Down
Loading

0 comments on commit d179d79

Please sign in to comment.