Skip to content

Commit

Permalink
EJBCLIENT-32 Add a testcase to make sure failover works when the unde…
Browse files Browse the repository at this point in the history
…ployment notification hasn't yet reached the client
  • Loading branch information
jaikiran authored and stuartwdouglas committed Mar 7, 2012
1 parent ffdd0db commit 21b15a5
Show file tree
Hide file tree
Showing 7 changed files with 738 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

package org.jboss.ejb.client.remoting;

import org.jboss.ejb.client.Affinity;
import org.jboss.ejb.client.EJBLocator;
import org.jboss.ejb.client.SessionID;
import org.jboss.marshalling.AbstractClassResolver;
import org.jboss.marshalling.ByteInput;
import org.jboss.marshalling.ByteOutput;
Expand All @@ -31,9 +33,11 @@
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
import org.jboss.marshalling.Unmarshaller;
import org.jboss.remoting3.Channel;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -57,9 +61,10 @@ public class DummyProtocolHandler {
private static final byte HEADER_INVOCATION_REQUEST = 0x03;
private static final byte HEADER_INVOCATION_CANCEL_REQUEST = 0x04;
private static final byte HEADER_INVOCATION_RESPONSE = 0x05;
private static final byte HEADER_INVOCATION_FAILURE = 0x06;
private static final byte HEADER_MODULE_AVAILABLE = 0x08;
private static final byte HEADER_MODULE_UNAVAILABLE = 0x09;
private static final byte HEADER_NO_SUCH_EJB_FAILURE = 0x0A;
private static final byte HEADER_INVOCATION_EXCEPTION = 0x06;

public DummyProtocolHandler(final String marshallerType) {
this.marshallerFactory = Marshalling.getProvidedMarshallerFactory(marshallerType);
Expand Down Expand Up @@ -137,13 +142,10 @@ public void writeMethodInvocationResponse(final DataOutput output, final short i
marshaller.finish();
}

public void writeMethodInvocationFailureResponse(final DataOutput output, final short invocationId,
final Throwable t, final Map<String, Object> attachments) throws IOException {
if (output == null) {
throw new IllegalArgumentException("Cannot write to null output");
}
public void writeException(final DataOutput output, final short invocationId, final Throwable t,
final Map<String, Object> attachments) throws IOException {
// write the header
output.write(HEADER_INVOCATION_FAILURE);
output.write(HEADER_INVOCATION_EXCEPTION);
// write the invocation id
output.writeShort(invocationId);
// write out the exception
Expand All @@ -155,6 +157,48 @@ public void writeMethodInvocationFailureResponse(final DataOutput output, final
marshaller.finish();
}

private void writeInvocationFailure(final DataOutput output, final byte messageHeader, final short invocationId, final String failureMessage) throws IOException {
// write header
output.writeByte(messageHeader);
// write invocation id
output.writeShort(invocationId);
// write the failure message
output.writeUTF(failureMessage);
}

public void writeNoSuchEJBFailureMessage(final DataOutput output, final short invocationId, final String appName, final String moduleName,
final String distinctName, final String beanName, final String viewClassName) throws IOException {

final StringBuffer sb = new StringBuffer("No such EJB[");
sb.append("appname=").append(appName).append(",");
sb.append("modulename=").append(moduleName).append(",");
sb.append("distinctname=").append(distinctName).append(",");
sb.append("beanname=").append(beanName);
if (viewClassName != null) {
sb.append(",").append("viewclassname=").append(viewClassName);
}
sb.append("]");
this.writeInvocationFailure(output, HEADER_NO_SUCH_EJB_FAILURE, invocationId, sb.toString());
}

public void writeSessionId(final DataOutput output, final short invocationId, final SessionID sessionID, final Affinity hardAffinity) throws IOException {
final byte[] sessionIdBytes = sessionID.getEncodedForm();
// write out header
output.writeByte(HEADER_SESSION_OPEN_RESPONSE);
// write out invocation id
output.writeShort(invocationId);
// session id byte length
PackedInteger.writePackedInteger(output, sessionIdBytes.length);
// write out the session id bytes
output.write(sessionIdBytes);
// now marshal the hard affinity associated with this session
final Marshaller marshaller = this.prepareForMarshalling(output);
marshaller.writeObject(hardAffinity);

// finish marshalling
marshaller.finish();
}

private Map<String, Object> readAttachments(final ObjectInput input) throws IOException, ClassNotFoundException {
final int numAttachments = input.readByte();
if (numAttachments == 0) {
Expand Down
145 changes: 128 additions & 17 deletions src/test/java/org/jboss/ejb/client/test/common/DummyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
*/
package org.jboss.ejb.client.test.common;

import org.jboss.ejb.client.ClusterAffinity;
import org.jboss.ejb.client.SessionID;
import org.jboss.ejb.client.remoting.DummyProtocolHandler;
import org.jboss.ejb.client.remoting.MethodInvocationRequest;
import org.jboss.ejb.client.remoting.PackedInteger;
Expand All @@ -36,7 +38,6 @@
import org.jboss.remoting3.remote.RemoteConnectionProviderFactory;
import org.jboss.remoting3.security.SimpleServerAuthenticationProvider;
import org.jboss.remoting3.spi.NetworkServerProvider;
import org.jboss.sasl.JBossSaslProvider;
import org.xnio.IoUtils;
import org.xnio.OptionMap;
import org.xnio.Options;
Expand All @@ -49,16 +50,16 @@
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.Security;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

Expand All @@ -70,6 +71,7 @@ public class DummyServer {
private static final Logger logger = Logger.getLogger(DummyServer.class);

private static final String[] supportedMarshallerTypes = new String[]{"river", "java-serial"};
private static final String CLUSTER_NAME = "dummy-cluster";

private Endpoint endpoint;

Expand Down Expand Up @@ -152,6 +154,10 @@ public void stop() throws IOException {
IoUtils.safeClose(this.endpoint);
}

public String getClusterName() {
return this.CLUSTER_NAME;
}


class Version1Receiver implements Channel.Receiver {

Expand Down Expand Up @@ -183,10 +189,24 @@ public void handleMessage(Channel channel, MessageInputStream messageInputStream
Object methodInvocationResult = null;
try {
methodInvocationResult = DummyServer.this.handleMethodInvocationRequest(methodInvocationRequest);
} catch (NoSuchEJBException nsee) {
final DataOutputStream outputStream = new DataOutputStream(channel.writeMessage());
try {
this.dummyProtocolHandler.writeNoSuchEJBFailureMessage(outputStream, methodInvocationRequest.getInvocationId(), methodInvocationRequest.getAppName(),
methodInvocationRequest.getModuleName(), methodInvocationRequest.getDistinctName(), methodInvocationRequest.getBeanName(),
methodInvocationRequest.getViewClassName());
} finally {
outputStream.close();
}
return;
} catch (Exception e) {
logger.error("Error while invoking method", e);
// TODO: Convey this error back to client
throw new RuntimeException(e);
final DataOutputStream outputStream = new DataOutputStream(channel.writeMessage());
try {
this.dummyProtocolHandler.writeException(outputStream, methodInvocationRequest.getInvocationId(), e, methodInvocationRequest.getAttachments());
} finally {
outputStream.close();
}
return;
}
logger.info("Method invocation result on server " + methodInvocationResult);
// write the method invocation result
Expand All @@ -197,6 +217,16 @@ public void handleMessage(Channel channel, MessageInputStream messageInputStream
outputStream.close();
}

break;
case 0x01:
// session open request
try {
this.handleSessionOpenRequest(channel, messageInputStream);
} catch (Exception e) {
// TODO: Let the client know of this exception
throw new RuntimeException(e);
}

break;
default:
logger.warn("Not supported message header 0x" + Integer.toHexString(header) + " received by " + this);
Expand All @@ -213,6 +243,45 @@ public void handleMessage(Channel channel, MessageInputStream messageInputStream

}

private void handleSessionOpenRequest(Channel channel, MessageInputStream messageInputStream) throws IOException {
if (messageInputStream == null) {
throw new IllegalArgumentException("Cannot read from null message inputstream");
}
final DataInputStream dataInputStream = new DataInputStream(messageInputStream);

// read invocation id
final short invocationId = dataInputStream.readShort();
final String appName = dataInputStream.readUTF();
final String moduleName = dataInputStream.readUTF();
final String distinctName = dataInputStream.readUTF();
final String beanName = dataInputStream.readUTF();

final EJBModuleIdentifier ejbModuleIdentifier = new EJBModuleIdentifier(appName, moduleName, distinctName);
final Map<String, Object> ejbs = DummyServer.this.registeredEJBs.get(ejbModuleIdentifier);
if (ejbs == null || ejbs.get(beanName) == null) {
final DataOutputStream outputStream = new DataOutputStream(channel.writeMessage());
try {
this.dummyProtocolHandler.writeNoSuchEJBFailureMessage(outputStream, invocationId, appName, moduleName, distinctName, beanName, null);
} finally {
outputStream.close();
}
return;
}
final UUID uuid = UUID.randomUUID();
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
final SessionID sessionID = SessionID.createSessionID(bb.array());
final DataOutputStream outputStream = new DataOutputStream(channel.writeMessage());
try {
final ClusterAffinity hardAffinity = new ClusterAffinity(DummyServer.this.CLUSTER_NAME);
this.dummyProtocolHandler.writeSessionId(outputStream, invocationId, sessionID, hardAffinity);
} finally {
outputStream.close();
}
}


}

public void register(final String appName, final String moduleName, final String distinctName, final String beanName, final Object instance) {
Expand All @@ -225,14 +294,35 @@ public void register(final String appName, final String moduleName, final String
}
ejbs.put(beanName, instance);
try {
this.sendNewModuleAvailabilityToClients(new EJBModuleIdentifier[]{moduleID});
this.sendNewModuleReportToClients(new EJBModuleIdentifier[]{moduleID}, true);
} catch (IOException e) {
logger.warn("Could not send EJB module availability message to clients, for module " + moduleID, e);
}
}

private void sendNewModuleAvailabilityToClients(final EJBModuleIdentifier[] newModules) throws IOException {
if (newModules == null) {
public void unregister(final String appName, final String moduleName, final String distinctName, final String beanName) {
this.unregister(appName, moduleName, distinctName, beanName, true);
}

public void unregister(final String appName, final String moduleName, final String distinctName, final String beanName, final boolean notifyClients) {

final EJBModuleIdentifier moduleID = new EJBModuleIdentifier(appName, moduleName, distinctName);
Map<String, Object> ejbs = this.registeredEJBs.get(moduleID);
if (ejbs != null) {
ejbs.remove(beanName);
}
if (notifyClients) {
try {
this.sendNewModuleReportToClients(new EJBModuleIdentifier[]{moduleID}, false);
} catch (IOException e) {
logger.warn("Could not send EJB module un-availability message to clients, for module " + moduleID, e);
}
}
}


private void sendNewModuleReportToClients(final EJBModuleIdentifier[] modules, final boolean availabilityReport) throws IOException {
if (modules == null) {
return;
}
if (this.openChannels.isEmpty()) {
Expand All @@ -241,7 +331,11 @@ private void sendNewModuleAvailabilityToClients(final EJBModuleIdentifier[] newM
for (final Channel channel : this.openChannels) {
final DataOutputStream dataOutputStream = new DataOutputStream(channel.writeMessage());
try {
this.writeModuleAvailability(dataOutputStream, newModules);
if (availabilityReport) {
this.writeModuleAvailability(dataOutputStream, modules);
} else {
this.writeModuleUnAvailability(dataOutputStream, modules);
}
} catch (IOException e) {
logger.warn("Could not send module availability message to client", e);
} finally {
Expand All @@ -260,22 +354,38 @@ private void writeModuleAvailability(final DataOutput output, final EJBModuleIde
}
// write the header
output.write(0x08);
this.writeModuleReport(output, ejbModuleIdentifiers);
}

private void writeModuleUnAvailability(final DataOutput output, final EJBModuleIdentifier[] ejbModuleIdentifiers) throws IOException {
if (output == null) {
throw new IllegalArgumentException("Cannot write to null output");
}
if (ejbModuleIdentifiers == null) {
throw new IllegalArgumentException("EJB module identifiers cannot be null");
}
// write the header
output.write(0x09);
this.writeModuleReport(output, ejbModuleIdentifiers);
}

private void writeModuleReport(final DataOutput output, final EJBModuleIdentifier[] modules) throws IOException {
// write the count
PackedInteger.writePackedInteger(output, ejbModuleIdentifiers.length);
// write the app/module names
for (int i = 0; i < ejbModuleIdentifiers.length; i++) {
PackedInteger.writePackedInteger(output, modules.length);
// write the module identifiers
for (int i = 0; i < modules.length; i++) {
// write the app name
final String appName = ejbModuleIdentifiers[i].getAppName();
final String appName = modules[i].getAppName();
if (appName == null) {
// write out a empty string
output.writeUTF("");
} else {
output.writeUTF(appName);
}
// write the module name
output.writeUTF(ejbModuleIdentifiers[i].getModuleName());
output.writeUTF(modules[i].getModuleName());
// write the distinct name
final String distinctName = ejbModuleIdentifiers[i].getDistinctName();
final String distinctName = modules[i].getDistinctName();
if (distinctName == null) {
// write out an empty string
output.writeUTF("");
Expand Down Expand Up @@ -348,7 +458,7 @@ public void handleMessage(Channel channel, MessageInputStream message) {
channel.receiveMessage(receiver);
// send module availability report to clients
final Collection<EJBModuleIdentifier> availableModules = DummyServer.this.registeredEJBs.keySet();
DummyServer.this.sendNewModuleAvailabilityToClients(availableModules.toArray(new EJBModuleIdentifier[availableModules.size()]));
DummyServer.this.sendNewModuleReportToClients(availableModules.toArray(new EJBModuleIdentifier[availableModules.size()]), true);
break;
default:
logger.info("Received unsupported version 0x" + Integer.toHexString(version) + " from client, on channel " + channel);
Expand All @@ -370,6 +480,7 @@ public void handleMessage(Channel channel, MessageInputStream message) {
}
}


private class EJBModuleIdentifier {
private final String appName;

Expand Down
33 changes: 33 additions & 0 deletions src/test/java/org/jboss/ejb/client/test/invocation/retry/Echo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2012, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.jboss.ejb.client.test.invocation.retry;

/**
* @author Jaikiran Pai
*/
public interface Echo {

String echo(String msg);

String getServerName();
}
Loading

0 comments on commit 21b15a5

Please sign in to comment.