Skip to content

Commit

Permalink
issue #143: added safeguarding and mitigations against CMEs
Browse files Browse the repository at this point in the history
  • Loading branch information
S1artie committed Jul 12, 2017
1 parent 69de9df commit 432a8fe
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 7 deletions.
Expand Up @@ -19,7 +19,7 @@
* @author Rene Schneider - initial API and implementation
*
*/
public class SetListEntry implements Serializable {
public class SetListEntry implements Serializable, Cloneable {

/**
* Serialization.
Expand Down Expand Up @@ -198,4 +198,14 @@ public String toString() {
return tempBuffer.toString();
}

@Override
public SetListEntry clone() {
SetListEntry tempClone = new SetListEntry();
tempClone.id = this.id;
tempClone.type = this.type;
tempClone.attributes.putAll(this.attributes);

return tempClone;
}

}
Expand Up @@ -14,6 +14,7 @@
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -85,6 +86,13 @@ public class Endpoint {
*/
private static final int DISCONNECT_WAIT_TIME = 10000;

/**
* Maximum number of times to retry serialization of an outgoing message in case of
* ConcurrentModificationExceptions. Those should not occur, but if they do, this mitigation is in place to attempt
* to limit the impact they have.
*/
private static final int SERIALIZATION_RETRY_COUNT = 10;

/**
* A map of message processors.
*/
Expand Down Expand Up @@ -369,12 +377,39 @@ public void run() {
}

if (tempMessageObject != null && socket.isConnected()) {
Kryo tempKryo = instantiateKryo();
ByteArrayOutputStream tempStream = new ByteArrayOutputStream();
DeflaterOutputStream tempDeflateStream = new DeflaterOutputStream(tempStream);
Output tempKryoOutput = new Output(tempDeflateStream);
tempKryo.writeClassAndObject(tempKryoOutput, tempMessageObject);
tempKryoOutput.close();
int tempSerializationAttempts = 0;
ByteArrayOutputStream tempStream;
while (true) {
Kryo tempKryo = instantiateKryo();
tempStream = new ByteArrayOutputStream();
DeflaterOutputStream tempDeflateStream = new DeflaterOutputStream(tempStream);
Output tempKryoOutput = new Output(tempDeflateStream);
try {
tempKryo.writeClassAndObject(tempKryoOutput, tempMessageObject);
break;
} catch (ConcurrentModificationException exc) {
// Handle ConcurrentModificationExceptions by first trying to mitigate them via retry.
// But only do that for a limited number of times. This mitigation has been added
// because of Issue #143: https://github.com/integrity-tf/integrity/issues/143
tempSerializationAttempts++;
System.err.println(
"WARNING: FAILED TO SERIALIZE MESSAGE IN ATTEMPT " + tempSerializationAttempts);
if (tempSerializationAttempts >= SERIALIZATION_RETRY_COUNT) {
// No point in retrying forever - escalate this by closing the connection and
// rethrowing. Closing connection means the entire test execution will be killed by
// follow-up I/O errors, which is totally expected and the entire point.
System.err.println("FAILED TO SERIALIZE MESSAGE REPEATEDLY DUE TO CONCURRENT "
+ "MODIFICATIONS! CLOSING CONNECTION NOW IN ORDER TO TEAR EVERYTHING DOWN.");
closeInternal();
throw exc;
} else {
// Just print the stack trace to stdout for reference. We'll try it again...
exc.printStackTrace();
}
} finally {
tempKryoOutput.close();
}
}

byte[] tempMessage = tempStream.toByteArray();
byte[] tempLength = new byte[4];
Expand Down
Expand Up @@ -43,6 +43,12 @@ public class SetListUpdateMessage extends AbstractMessage {
public SetListUpdateMessage(Integer anEntryInExecution, SetListEntry... someUpdatedEntries) {
entryInExecution = anEntryInExecution;
updatedEntries = someUpdatedEntries;
// We clone the actual entries to make sure that there are no ConcurrentModificationExceptions. The ones that we
// send in the message are sent by a different thread than the main thread that created them, which opens up
// this possibility. See Issue #143: https://github.com/integrity-tf/integrity/issues/143
for (int i = 0; i < updatedEntries.length; i++) {
updatedEntries[i] = updatedEntries[i].clone();
}
}

/**
Expand Down

0 comments on commit 432a8fe

Please sign in to comment.