Skip to content

Commit

Permalink
Add method to NodeFactory to obtain the address of a given Node.
Browse files Browse the repository at this point in the history
Split LocalGroup into 2 implementations, one implementing NodeFactory<org.jgroups.Address> and the other implementing NodeFactory<org.infinispan.remoting.transport.Address>
  • Loading branch information
pferraro committed Apr 9, 2018
1 parent 0c17a7d commit d62d2dc
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 82 deletions.
Expand Up @@ -43,7 +43,6 @@
import org.wildfly.clustering.dispatcher.CommandDispatcherException;
import org.wildfly.clustering.dispatcher.CommandResponse;
import org.wildfly.clustering.group.Node;
import org.wildfly.clustering.server.Addressable;
import org.wildfly.clustering.server.group.Group;

/**
Expand Down Expand Up @@ -123,7 +122,7 @@ public <R> Map<Node, Future<R>> submitOnCluster(Command<R, ? super C> command, N
for (Node node : this.group.getMembership().getMembers()) {
if (!excluded.contains(node)) {
try {
results.put(node, this.dispatcher.sendMessageWithFuture(getAddress(node), buffer, options));
results.put(node, this.dispatcher.sendMessageWithFuture(this.group.getAddress(node), buffer, options));
} catch (Exception e) {
throw new CommandDispatcherException(e);
}
Expand All @@ -142,7 +141,7 @@ public <R> CommandResponse<R> executeOnNode(Command<R, ? super C> command, Node
RequestOptions options = this.createRequestOptions();
try {
// Use sendMessageWithFuture(...) instead of sendMessage(...) since we want to differentiate between sender exceptions and receiver exceptions
Future<R> future = this.dispatcher.sendMessageWithFuture(getAddress(node), buffer, options);
Future<R> future = this.dispatcher.sendMessageWithFuture(this.group.getAddress(node), buffer, options);
return new SimpleCommandResponse<>(future.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -163,15 +162,15 @@ public <R> Future<R> submitOnNode(Command<R, ? super C> command, Node node) thro
Buffer buffer = this.createBuffer(command);
RequestOptions options = this.createRequestOptions();
try {
return this.dispatcher.sendMessageWithFuture(getAddress(node), buffer, options);
return this.dispatcher.sendMessageWithFuture(this.group.getAddress(node), buffer, options);
} catch (Exception e) {
throw new CommandDispatcherException(e);
}
}

public <R> Future<R> submit(Node node, Buffer buffer, RequestOptions options) throws CommandDispatcherException {
try {
return this.dispatcher.sendMessageWithFuture(getAddress(node), buffer, options);
return this.dispatcher.sendMessageWithFuture(this.group.getAddress(node), buffer, options);
} catch (Exception e) {
throw new CommandDispatcherException(e);
}
Expand All @@ -186,17 +185,13 @@ private <R> Buffer createBuffer(Command<R, ? super C> command) {
}

private boolean isLocal(Node node) {
return this.getLocalAddress().equals(getAddress(node));
}

private static Address getAddress(Node node) {
return (node instanceof Addressable) ? ((Addressable) node).getAddress() : null;
return this.getLocalAddress().equals(this.group.getAddress(node));
}

private RequestOptions createRequestOptions(Node... excludedNodes) {
Address[] excludedAddresses = new Address[excludedNodes.length];
for (int i = 0; i < excludedNodes.length; ++i) {
excludedAddresses[i] = getAddress(excludedNodes[i]);
excludedAddresses[i] = this.group.getAddress(excludedNodes[i]);
}
return this.createRequestOptions().exclusionList(excludedAddresses);
}
Expand Down
Expand Up @@ -240,6 +240,11 @@ public Node createNode(Address address) {
});
}

@Override
public Address getAddress(Node node) {
return ((AddressableNode) node).getAddress();
}

@Override
public void viewAccepted(View view) {
View oldView = this.view.getAndSet(view);
Expand Down
@@ -0,0 +1,85 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2018, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.wildfly.clustering.server.group;

import org.wildfly.clustering.Registration;
import org.wildfly.clustering.group.GroupListener;
import org.wildfly.clustering.group.Membership;
import org.wildfly.clustering.group.Node;

/**
* Abstract non-clustered group implementation.
* Registered {@link GroupListener} are never invoked, as membership of a local group is fixed.
* @author Paul Ferraro
*/
public abstract class AbstractLocalGroup<A> implements Group<A>, Registration {
private static final String NAME = "local";

private final Membership membership;

public AbstractLocalGroup(String nodeName) {
this.membership = new SingletonMembership(new LocalNode(nodeName));
}

@Override
public void close() {
// We never registered anything
}

@Override
public Registration register(GroupListener listener) {
// Nothing to register
return this;
}

@Deprecated
@Override
public void removeListener(org.wildfly.clustering.group.Group.Listener listener) {
// We never registered anything
}

@Override
public String getName() {
return NAME;
}

@Override
public Node getLocalMember() {
return this.membership.getCoordinator();
}

@Override
public Membership getMembership() {
return this.membership;
}

@Override
public boolean isSingleton() {
return true;
}

@Override
public Node createNode(A address) {
return this.getLocalMember();
}
}
@@ -0,0 +1,61 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2018, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.wildfly.clustering.server.group;

import java.util.function.Function;

import org.jboss.as.clustering.controller.CapabilityServiceBuilder;
import org.jboss.as.server.ServerEnvironment;
import org.jboss.as.server.ServerEnvironmentService;
import org.jboss.msc.service.ServiceBuilder;
import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.ServiceTarget;
import org.wildfly.clustering.group.Group;
import org.wildfly.clustering.service.InjectedValueDependency;
import org.wildfly.clustering.service.MappedValueService;
import org.wildfly.clustering.service.ValueDependency;

/**
* Builds a non-clustered {@link Group}.
* @author Paul Ferraro
*/
public abstract class AbstractLocalGroupBuilder implements CapabilityServiceBuilder<Group>, Function<ServerEnvironment, Group> {

private final ServiceName name;
private final ValueDependency<ServerEnvironment> environment = new InjectedValueDependency<>(ServerEnvironmentService.SERVICE_NAME, ServerEnvironment.class);

public AbstractLocalGroupBuilder(ServiceName name) {
this.name = name;
}

@Override
public ServiceName getServiceName() {
return this.name;
}

@Override
public ServiceBuilder<Group> build(ServiceTarget target) {
return this.environment.register(target.addService(this.name, new MappedValueService<>(this, this.environment))).setInitialMode(ServiceController.Mode.ON_DEMAND);
}
}
Expand Up @@ -44,8 +44,10 @@
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.LocalModeAddress;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
import org.infinispan.remoting.transport.jgroups.JGroupsAddressCache;
import org.jboss.threads.JBossThreadFactory;
import org.wildfly.clustering.Registration;
import org.wildfly.clustering.group.GroupListener;
Expand Down Expand Up @@ -124,8 +126,13 @@ public Node createNode(Address address) {
return this.nodeFactory.createNode(toJGroupsAddress(address));
}

@Override
public Address getAddress(Node node) {
return (node instanceof AddressableNode) ? JGroupsAddressCache.fromJGroupsAddress(((AddressableNode) node).getAddress()) : LocalModeAddress.INSTANCE;
}

private static org.jgroups.Address toJGroupsAddress(Address address) {
if (address == null) return null;
if ((address == null) || (address == LocalModeAddress.INSTANCE)) return null;
if (address instanceof JGroupsAddress) {
JGroupsAddress jgroupsAddress = (JGroupsAddress) address;
return jgroupsAddress.getJGroupsAddress();
Expand Down
@@ -0,0 +1,43 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2018, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.wildfly.clustering.server.group;

import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.LocalModeAddress;
import org.wildfly.clustering.group.Node;

/**
* Non-clustered cache group.
* @author Paul Ferraro
*/
public class LocalCacheGroup extends AbstractLocalGroup<Address> {

public LocalCacheGroup(String nodeName) {
super(nodeName);
}

@Override
public Address getAddress(Node node) {
return LocalModeAddress.INSTANCE;
}
}
@@ -0,0 +1,43 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2018, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.wildfly.clustering.server.group;

import org.jboss.as.server.ServerEnvironment;
import org.jboss.msc.service.ServiceName;
import org.wildfly.clustering.group.Group;

/**
* Builds a non-clustered cache {@link Group} service.
* @author Paul Ferraro
*/
public class LocalCacheGroupBuilder extends AbstractLocalGroupBuilder {

public LocalCacheGroupBuilder(ServiceName name) {
super(name);
}

@Override
public Group apply(ServerEnvironment environment) {
return new LocalCacheGroup(environment.getNodeName());
}
}
Expand Up @@ -34,6 +34,6 @@
public class LocalCacheGroupBuilderProvider extends CacheGroupBuilderProvider implements LocalCacheBuilderProvider {

public LocalCacheGroupBuilderProvider() {
super((name, containerName, cacheName) -> new LocalGroupBuilder(name));
super((name, containerName, cacheName) -> new LocalCacheGroupBuilder(name));
}
}

0 comments on commit d62d2dc

Please sign in to comment.