Skip to content
Browse files

ISPN-2848 Use the new bundling mechanism from JGroups 3.3.0

* set new bundler in jgroups configuration files
* removed deprecated flags
* remove unused parameters
  • Loading branch information...
1 parent 113842c commit 73da108cdcf9db4f3edbcd6dbda6938d6e45d148 @pruivo pruivo committed with Mircea Markus
View
38 core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
@@ -135,14 +135,14 @@ private boolean isValid(Message req) {
* @param recipients Guaranteed not to be null. Must <b>not</b> contain self.
*/
public RspList<Object> invokeRemoteCommands(final List<Address> recipients, final ReplicableCommand command, final ResponseMode mode, final long timeout,
- final boolean anycasting, final boolean oob, final RspFilter filter,
+ final boolean oob, final RspFilter filter,
boolean asyncMarshalling, final boolean ignoreLeavers, final boolean totalOrder, final boolean distribution) throws InterruptedException {
if (asyncMarshalling) {
asyncExecutor.submit(new Callable<RspList<Object>>() {
@Override
public RspList<Object> call() throws Exception {
return processCalls(command, recipients == null, timeout, filter, recipients, mode,
- req_marshaller, CommandAwareRpcDispatcher.this, oob, anycasting, ignoreLeavers, totalOrder, distribution);
+ req_marshaller, CommandAwareRpcDispatcher.this, oob, ignoreLeavers, totalOrder, distribution);
}
});
return null; // don't wait for a response!
@@ -150,7 +150,7 @@ private boolean isValid(Message req) {
RspList<Object> response;
try {
response = processCalls(command, recipients == null, timeout, filter, recipients, mode,
- req_marshaller, this, oob, anycasting, ignoreLeavers, totalOrder, distribution);
+ req_marshaller, this, oob, ignoreLeavers, totalOrder, distribution);
} catch (InterruptedException e) {
throw e;
} catch (SuspectedException e) {
@@ -201,10 +201,10 @@ public Response call() throws Exception {
}
public RspList<Object> broadcastRemoteCommands(ReplicableCommand command, ResponseMode mode, long timeout,
- boolean anycasting, boolean oob, RspFilter filter,
+ boolean oob, RspFilter filter,
boolean asyncMarshalling, boolean ignoreLeavers, boolean totalOrder, boolean distribution)
throws InterruptedException {
- return invokeRemoteCommands(null, command, mode, timeout, anycasting, oob, filter, asyncMarshalling, ignoreLeavers, totalOrder, distribution);
+ return invokeRemoteCommands(null, command, mode, timeout, oob, filter, asyncMarshalling, ignoreLeavers, totalOrder, distribution);
}
private boolean containsOnlyNulls(RspList<Object> l) {
@@ -322,25 +322,17 @@ private void reply(org.jgroups.blocks.Response response, Object retVal) {
}
}
- protected static Message constructMessage(Buffer buf, Address recipient, boolean oob, ResponseMode mode, boolean rsvp,
+ protected static Message constructMessage(Buffer buf, Address recipient, boolean oob, boolean rsvp,
boolean totalOrder) {
Message msg = new Message();
msg.setBuffer(buf);
- if (oob) msg.setFlag(Message.OOB);
- if (oob || mode != ResponseMode.GET_NONE) {
- msg.setFlag(Message.DONT_BUNDLE);
- // This is removed since this optimisation is no longer valid. See ISPN-1878
- // msg.setFlag(Message.NO_FC);
- }
- if (rsvp) msg.setFlag(Message.RSVP);
+ if (oob) msg.setFlag(Message.Flag.OOB);
+ if (rsvp) msg.setFlag(Message.Flag.RSVP);
//In total order protocol, the sequencer is in the protocol stack so we need to bypass the protocol
if(!totalOrder) {
msg.setFlag(Message.Flag.NO_TOTAL_ORDER);
} else {
- //disable flow control -- send immediately to avoid long commit phases
- msg.setFlag(Message.Flag.NO_FC);
- msg.setFlag(Message.Flag.DONT_BUNDLE);
msg.clearFlag(Message.Flag.OOB);
}
if (recipient != null) msg.setDest(recipient);
@@ -372,7 +364,7 @@ private static Response processSingleCall(ReplicableCommand command, long timeou
Response retval;
Buffer buf;
buf = marshallCall(marshaller, command);
- retval = card.sendMessage(constructMessage(buf, destination, oob, mode, rsvp, false),
+ retval = card.sendMessage(constructMessage(buf, destination, oob, rsvp, false),
new RequestOptions(mode, timeout));
// we only bother parsing responses if we are not in ASYNC mode.
@@ -393,7 +385,7 @@ private static Response processSingleCall(ReplicableCommand command, long timeou
private static RspList<Object> processCalls(ReplicableCommand command, boolean broadcast, long timeout,
RspFilter filter, List<Address> dests, ResponseMode mode,
Marshaller marshaller, CommandAwareRpcDispatcher card,
- boolean oob, boolean anycasting, boolean ignoreLeavers, boolean totalOrder, boolean distribution) throws Exception {
+ boolean oob, boolean ignoreLeavers, boolean totalOrder, boolean distribution) throws Exception {
if (trace) log.tracef("Replication task sending %s to addresses %s with response mode %s", command, dests, mode);
/// HACK ALERT! Used for ISPN-1789. Enable RSVP if the command is a cache topology control command.
@@ -404,7 +396,7 @@ private static Response processSingleCall(ReplicableCommand command, long timeou
Buffer buf;
if (totalOrder && distribution) {
buf = marshallCall(marshaller, command);
- Message message = constructMessage(buf, null, oob, mode, rsvp, totalOrder);
+ Message message = constructMessage(buf, null, oob, rsvp, totalOrder);
AnycastAddress address = new AnycastAddress(dests);
message.setDest(address);
@@ -420,7 +412,7 @@ private static Response processSingleCall(ReplicableCommand command, long timeou
opts.setExclusionList(card.getChannel().getAddress());
}
- retval = card.castMessage(dests, constructMessage(buf, null, oob, mode, rsvp, totalOrder),opts);
+ retval = card.castMessage(dests, constructMessage(buf, null, oob, rsvp, totalOrder),opts);
} else {
RequestOptions opts = new RequestOptions(mode, timeout);
@@ -437,7 +429,7 @@ private static Response processSingleCall(ReplicableCommand command, long timeou
// (see FutureCollator) and the first successful response is used.
FutureCollator futureCollator = new FutureCollator(filter, dests.size(), timeout);
for (Address a : dests) {
- NotifyingFuture<Object> f = card.sendMessageWithFuture(constructMessage(buf, a, oob, mode, rsvp, false), opts);
+ NotifyingFuture<Object> f = card.sendMessageWithFuture(constructMessage(buf, a, oob, rsvp, false), opts);
futureCollator.watchFuture(f, a);
}
retval = futureCollator.getResponseList();
@@ -446,7 +438,7 @@ private static Response processSingleCall(ReplicableCommand command, long timeou
Map<Address, Future<Object>> futures = new HashMap<Address, Future<Object>>(dests.size());
for (Address dest : dests)
- futures.put(dest, card.sendMessageWithFuture(constructMessage(buf, dest, oob, mode, rsvp, false), opts));
+ futures.put(dest, card.sendMessageWithFuture(constructMessage(buf, dest, oob, rsvp, false), opts));
retval = new RspList<Object>();
@@ -468,7 +460,7 @@ private static Response processSingleCall(ReplicableCommand command, long timeou
}
} else if (mode == ResponseMode.GET_NONE) {
// An ASYNC call. We don't care about responses.
- for (Address dest : dests) card.sendMessage(constructMessage(buf, dest, oob, mode, rsvp, false), opts);
+ for (Address dest : dests) card.sendMessage(constructMessage(buf, dest, oob, rsvp, false), opts);
}
}
View
14 core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
@@ -347,7 +347,7 @@ private void buildChannel() {
String channelLookupClassName = props.getProperty(CHANNEL_LOOKUP);
try {
- JGroupsChannelLookup lookup = (JGroupsChannelLookup) Util.getInstance(channelLookupClassName, configuration.classLoader());
+ JGroupsChannelLookup lookup = Util.getInstance(channelLookupClassName, configuration.classLoader());
channel = lookup.getJGroupsChannel(props);
startChannel = lookup.shouldStartAndConnect();
stopChannel = lookup.shouldStopAndDisconnect();
@@ -504,12 +504,12 @@ public Address getAddress() {
org.jgroups.Address singleJGAddress = null;
if (broadcast || (totalOrder && !anycast)) {
- rsps = dispatcher.broadcastRemoteCommands(rpcCommand, toJGroupsMode(mode), timeout, recipients != null,
+ rsps = dispatcher.broadcastRemoteCommands(rpcCommand, toJGroupsMode(mode), timeout,
usePriorityQueue, toJGroupsFilter(responseFilter),
asyncMarshalling, ignoreLeavers, totalOrder, anycast);
} else if (totalOrder && anycast) {
rsps = dispatcher.invokeRemoteCommands(jgAddressList, rpcCommand, toJGroupsMode(mode), timeout,
- recipients != null, usePriorityQueue, toJGroupsFilter(responseFilter),
+ usePriorityQueue, toJGroupsFilter(responseFilter),
asyncMarshalling, ignoreLeavers, totalOrder, anycast);
} else {
if (jgAddressList == null || !jgAddressList.isEmpty()) {
@@ -529,7 +529,7 @@ public Address getAddress() {
usePriorityQueue, asyncMarshalling);
} else {
rsps = dispatcher.invokeRemoteCommands(jgAddressList, rpcCommand, toJGroupsMode(mode), timeout,
- recipients != null, usePriorityQueue, toJGroupsFilter(responseFilter),
+ usePriorityQueue, toJGroupsFilter(responseFilter),
asyncMarshalling, ignoreLeavers, totalOrder, anycast);
}
}
@@ -565,16 +565,16 @@ public Address getAddress() {
@Override
public BackupResponse backupRemotely(Collection<XSiteBackup> backups, ReplicableCommand rpcCommand) throws Exception {
log.tracef("About to send to backups %s, command %s", backups, rpcCommand);
- Buffer buf = dispatcher.marshallCall(dispatcher.getMarshaller(), rpcCommand);
+ Buffer buf = CommandAwareRpcDispatcher.marshallCall(dispatcher.getMarshaller(), rpcCommand);
Map<XSiteBackup, Future<Object>> syncBackupCalls = new HashMap<XSiteBackup, Future<Object>>(backups.size());
for (XSiteBackup xsb : backups) {
SiteMaster recipient = new SiteMaster(xsb.getSiteName());
if (xsb.isSync()) {
RequestOptions sync = new RequestOptions(org.jgroups.blocks.ResponseMode.GET_ALL, xsb.getTimeout());
- syncBackupCalls.put(xsb, dispatcher.sendMessageWithFuture(dispatcher.constructMessage(buf, recipient, false, org.jgroups.blocks.ResponseMode.GET_ALL, false, false), sync));
+ syncBackupCalls.put(xsb, dispatcher.sendMessageWithFuture(CommandAwareRpcDispatcher.constructMessage(buf, recipient, false, false, false), sync));
} else {
RequestOptions async = new RequestOptions(org.jgroups.blocks.ResponseMode.GET_NONE, xsb.getTimeout());
- dispatcher.sendMessage(dispatcher.constructMessage(buf, recipient, false, org.jgroups.blocks.ResponseMode.GET_NONE, false, false), async);
+ dispatcher.sendMessage(CommandAwareRpcDispatcher.constructMessage(buf, recipient, false, false, false), async);
}
}
return new JGroupsBackupResponse(syncBackupCalls);
View
5 core/src/main/resources/jgroups-ec2.xml
@@ -30,13 +30,12 @@
port_range="30"
recv_buf_size="20000000"
send_buf_size="640000"
- max_bundle_size="64000"
- max_bundle_timeout="30"
+ max_bundle_size="64k"
use_send_queues="true"
sock_conn_timeout="300"
enable_diagnostics="false"
- bundler_type="old"
+ bundler_type="new"
thread_pool.enabled="true"
thread_pool.min_threads="2"
View
5 core/src/main/resources/jgroups-tcp.xml
@@ -30,11 +30,10 @@
port_range="30"
recv_buf_size="20m"
send_buf_size="640k"
- max_bundle_size="64000"
- max_bundle_timeout="30"
+ max_bundle_size="64k"
use_send_queues="true"
enable_diagnostics="false"
- bundler_type="old"
+ bundler_type="new"
thread_naming_pattern="pl"
View
5 core/src/main/resources/jgroups-udp.xml
@@ -32,11 +32,10 @@
mcast_recv_buf_size="25m"
mcast_send_buf_size="640k"
loopback="true"
- max_bundle_size="64000"
- max_bundle_timeout="30"
+ max_bundle_size="64k"
ip_ttl="${jgroups.udp.ip_ttl:2}"
enable_diagnostics="false"
- bundler_type="old"
+ bundler_type="new"
thread_naming_pattern="pl"
View
4 core/src/test/resources/configs/jbosscache3x/all.xml
@@ -127,8 +127,8 @@
-->
<jgroupsConfig>
- <UDP discard_incompatible_packets="true" enable_bundling="false" enable_diagnostics="false" ip_ttl="2"
- loopback="true" max_bundle_size="64000" max_bundle_timeout="30" mcast_addr="228.10.10.10"
+ <UDP discard_incompatible_packets="true" enable_diagnostics="false" ip_ttl="2"
+ loopback="true" max_bundle_size="64k" mcast_addr="228.10.10.10" bundler_type="new"
mcast_port="45588" mcast_recv_buf_size="25000000" mcast_send_buf_size="640000"
oob_thread_pool.enabled="true" oob_thread_pool.keep_alive_time="10000" oob_thread_pool.max_threads="4"
oob_thread_pool.min_threads="1" oob_thread_pool.queue_enabled="true"
View
4 core/src/test/resources/configs/jbosscache3x/buddy-replication.xml
@@ -36,8 +36,8 @@
<!-- JGroups protocol stack properties. -->
<jgroupsConfig>
- <TCP start_port="7800" discard_incompatible_packets="true" enable_bundling="false" enable_diagnostics="true"
- enable_unicast_bundling="false" loopback="true" max_bundle_size="64000" max_bundle_timeout="30"
+ <TCP start_port="7800" discard_incompatible_packets="true" enable_diagnostics="true"
+ enable_unicast_bundling="false" loopback="true" max_bundle_size="64k" bundler_type="new"
oob_thread_pool.enabled="true" oob_thread_pool.keep_alive_time="10000" oob_thread_pool.max_threads="8"
oob_thread_pool.min_threads="2" oob_thread_pool.queue_enabled="false" oob_thread_pool.queue_max_size="10"
oob_thread_pool.rejection_policy="Run" recv_buf_size="20000000" thread_naming_pattern="pl"
View
4 core/src/test/resources/configs/jbosscache3x/invalidation-async.xml
@@ -37,8 +37,8 @@
<!-- JGroups protocol stack properties. -->
<jgroupsConfig>
- <UDP discard_incompatible_packets="true" enable_bundling="true" enable_diagnostics="false" ip_ttl="2"
- loopback="true" max_bundle_size="64000" max_bundle_timeout="30" mcast_addr="228.10.10.10"
+ <UDP discard_incompatible_packets="true" enable_diagnostics="false" ip_ttl="2"
+ loopback="true" max_bundle_size="64k" bundler_type="new" mcast_addr="228.10.10.10"
mcast_port="45588" mcast_recv_buf_size="100000000" mcast_send_buf_size="640000"
oob_thread_pool.enabled="true" oob_thread_pool.keep_alive_time="10000" oob_thread_pool.max_threads="20"
oob_thread_pool.min_threads="8" oob_thread_pool.queue_enabled="false" oob_thread_pool.queue_max_size="10"
View
4 core/src/test/resources/configs/jbosscache3x/total-replication.xml
@@ -30,8 +30,8 @@
<clustering mode="replication">
<!-- JGroups protocol stack properties. -->
<jgroupsConfig>
- <UDP discard_incompatible_packets="true" enable_bundling="false" enable_diagnostics="false" ip_ttl="2"
- loopback="true" max_bundle_size="64000" max_bundle_timeout="30" mcast_addr="228.10.10.10"
+ <UDP discard_incompatible_packets="true" enable_diagnostics="false" ip_ttl="2"
+ loopback="true" max_bundle_size="64k" bundler_type="30" mcast_addr="228.10.10.10"
mcast_port="45588" mcast_recv_buf_size="25000000" mcast_send_buf_size="640000"
oob_thread_pool.enabled="true" oob_thread_pool.keep_alive_time="10000" oob_thread_pool.max_threads="4"
oob_thread_pool.min_threads="1" oob_thread_pool.queue_enabled="true" oob_thread_pool.queue_max_size="10"
View
5 core/src/test/resources/configs/xsite/bridge.xml
@@ -26,12 +26,11 @@
port_range="30"
recv_buf_size="20000000"
send_buf_size="640000"
- max_bundle_size="64000"
- max_bundle_timeout="30"
+ max_bundle_size="64k"
use_send_queues="true"
sock_conn_timeout="300"
enable_diagnostics="false"
- bundler_type="old"
+ bundler_type="new"
send_queue_size="0"
thread_pool.enabled="true"
View
5 core/src/test/resources/stacks/tcp.xml
@@ -28,12 +28,11 @@
port_range="30"
recv_buf_size="20000000"
send_buf_size="640000"
- max_bundle_size="64000"
- max_bundle_timeout="30"
+ max_bundle_size="64k"
use_send_queues="true"
sock_conn_timeout="300"
enable_diagnostics="false"
- bundler_type="old"
+ bundler_type="new"
send_queue_size="0"
thread_pool.enabled="true"
View
5 core/src/test/resources/stacks/tcp_mping/tcp1.xml
@@ -28,11 +28,10 @@
port_range="30"
recv_buf_size="20000000"
send_buf_size="640000"
- max_bundle_size="64000"
- max_bundle_timeout="30"
+ max_bundle_size="64k"
use_send_queues="false"
sock_conn_timeout="300"
- bundler_type="old"
+ bundler_type="new"
thread_pool.enabled="true"
thread_pool.min_threads="1"
View
3 core/src/test/resources/stacks/tcp_mping/tcp2.xml
@@ -29,10 +29,9 @@
recv_buf_size="20000000"
send_buf_size="640000"
max_bundle_size="64000"
- max_bundle_timeout="30"
use_send_queues="false"
sock_conn_timeout="300"
- bundler_type="old"
+ bundler_type="new"
thread_pool.enabled="true"
thread_pool.min_threads="1"
View
4 core/src/test/resources/stacks/udp.xml
@@ -32,8 +32,8 @@
mcast_recv_buf_size="25000000"
mcast_send_buf_size="640000"
loopback="true"
- max_bundle_size="64000"
- max_bundle_timeout="30"
+ max_bundle_size="64k"
+ bundler_type="new"
ip_ttl="${jgroups.udp.ip_ttl:0}"
enable_diagnostics="false"
View
5 demos/gui/src/main/release/etc/config-samples/jgroups-relay1.xml
@@ -19,10 +19,9 @@
mcast_recv_buf_size="25000000"
mcast_send_buf_size="640000"
loopback="true"
- max_bundle_size="64000"
- max_bundle_timeout="30"
+ max_bundle_size="64k"
+ bundler_type="new"
ip_ttl="${jgroups.udp.ip_ttl:0}"
- enable_unicast_bundling="true"
enable_diagnostics="true"
thread_naming_pattern="cl"
View
3 demos/gui/src/main/release/etc/config-samples/jgroups-relay2.xml
@@ -20,9 +20,8 @@
mcast_send_buf_size="640000"
loopback="true"
max_bundle_size="64000"
- max_bundle_timeout="30"
+ bundler_type="new"
ip_ttl="${jgroups.udp.ip_ttl:0}"
- enable_unicast_bundling="true"
enable_diagnostics="true"
thread_naming_pattern="cl"
View
5 demos/gui/src/main/release/etc/config-samples/jgroups-tcp.xml
@@ -15,9 +15,8 @@
recv_buf_size="20000000"
send_buf_size="640000"
loopback="false"
- max_bundle_size="64000"
- max_bundle_timeout="30"
- enable_unicast_bundling="true"
+ max_bundle_size="64k"
+ bundler_type="new"
enable_diagnostics="true"
thread_naming_pattern="cl"

0 comments on commit 73da108

Please sign in to comment.
Something went wrong with that request. Please try again.