Skip to content

Commit

Permalink
Fix Java heap space error by reading line by line
Browse files Browse the repository at this point in the history
  • Loading branch information
ramindu90 committed Oct 2, 2019
1 parent d80102f commit 6a748af
Showing 1 changed file with 16 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@
import org.wso2.carbon.messaging.TextCarbonMessage;
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Map;
import java.util.Scanner;

/**
* A Client Connector implementation for file systems using the Apache VFS library for file operations
Expand Down Expand Up @@ -88,6 +89,7 @@ public boolean send(CarbonMessage carbonMessage, CarbonCallback carbonCallback,
ByteBuffer byteBuffer;
InputStream inputStream = null;
OutputStream outputStream = null;
Scanner scanner = null;
try {
FileSystemManager fsManager = VFS.getManager();
FileObject path = fsManager.resolveFile(fileURI, opts);
Expand Down Expand Up @@ -178,11 +180,18 @@ public boolean send(CarbonMessage carbonMessage, CarbonCallback carbonCallback,
Thread.sleep(readWaitTimeout);
} while (fileContentLastModifiedTime < path.getContent().getLastModifiedTime());
inputStream = path.getContent().getInputStream();
BinaryCarbonMessage message = new BinaryCarbonMessage(ByteBuffer.
wrap(toByteArray(inputStream)), true);
message.setProperty(org.wso2.carbon.messaging.Constants.DIRECTION,
org.wso2.carbon.messaging.Constants.DIRECTION_RESPONSE);
carbonMessageProcessor.receive(message, carbonCallback);
scanner = new Scanner(inputStream, "UTF-8");
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
BinaryCarbonMessage message = new BinaryCarbonMessage(ByteBuffer.
wrap(line.getBytes(StandardCharsets.UTF_8)), true);
message.setProperty(org.wso2.carbon.messaging.Constants.DIRECTION,
org.wso2.carbon.messaging.Constants.DIRECTION_RESPONSE);
carbonMessageProcessor.receive(message, carbonCallback);
}
if (scanner.ioException() != null) {
throw scanner.ioException();
}
} else {
throw new ClientConnectorException(
"Failed to read file: " + path.getName().getURI() + " not found");
Expand All @@ -202,6 +211,7 @@ public boolean send(CarbonMessage carbonMessage, CarbonCallback carbonCallback,
} catch (Exception e) {
throw new ClientConnectorException("Exception occurred while processing file: " + e.getMessage(), e);
} finally {
closeQuietly(scanner);
closeQuietly(inputStream);
closeQuietly(outputStream);
}
Expand All @@ -214,29 +224,6 @@ public String getProtocol() {
return Constants.PROTOCOL_FILE;
}

/**
* Obtain a byte[] from an input stream
*
* @param input The input stream that the data should be obtained from
* @return byte[] The byte array of data obtained from the input stream
* @throws IOException
*/
private static byte[] toByteArray(InputStream input) throws IOException {
long count = 0L;
byte[] buffer = new byte[4096];
int n1;
ByteArrayOutputStream output = new ByteArrayOutputStream();
for (; -1 != (n1 = input.read(buffer)); count += (long) n1) {
output.write(buffer, 0, n1);
}
if (logger.isDebugEnabled()) {
logger.debug(count + " bytes read");
}
byte[] bytes = output.toByteArray();
closeQuietly(output);
return bytes;
}

/**
* Closes streams quietly
* @param closeable The stream that should be closed
Expand Down

0 comments on commit 6a748af

Please sign in to comment.