Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jgroups channel #733

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/stages/core.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,9 @@ The stage start and stops some nodes concurrently (without waiting for each othe

### periodic-cluster-split-verify
Periodically check the cluster size.
> dir (**optional**) - Location on disk where the heap dumps should be stored.
> exit-on-failure (**optional**) - If true, then the benchmark stops when the stage returns an error. If false, then the stages in the current scenario are skipped, and the next scenario starts executing. Default is false.
> generate-heap-dump (**optional**) - Set this flag to true to generate a thread dump when the cluster size changed. Default is false.
> groups (**optional**) - Specifies in which groups this stage should actively run. The result set is intersection of specified workers, groups and roles. Default is all groups.
> initial-delay (**optional**) - Initial delay. Default is 0.
> live (**optional**) - If set it only prints objects which have active references and discards the ones that are ready to be garbage collected.
> period (**optional**) - How often should be executed. Default is every 30 minutes.
> roles (**optional**) - Specifies on which workers this stage should actively run, by their roles. The result set is intersection of specified workers, groups and roles. Supported roles are [COORDINATOR]. Default is all roles.
> stop (**optional**) - Set this flag to true in order to terminate. Default is false.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,223 @@
package org.radargun.service;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

import org.jgroups.Header;
import org.jgroups.JChannel;
import org.jgroups.ReceiverAdapter;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.View;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.blocks.RequestCorrelator;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.util.Util;
import org.radargun.Service;
import org.radargun.config.Property;
import org.radargun.traits.BasicOperations;
import org.radargun.traits.Clustered;
import org.radargun.traits.Lifecycle;
import org.radargun.traits.ProvidesTrait;
import org.radargun.utils.StringArrayConverter;

@Service(doc = "JGroupsService faking cache operations")
public class JGroups42Service extends JGroups36Service {
public class JGroups42Service implements Lifecycle, Clustered, BasicOperations.Cache {

private static final short REPLY_FLAGS =
(short) (Message.Flag.NO_FC.value() | Message.Flag.OOB.value() |
Message.Flag.NO_TOTAL_ORDER.value() | Message.Flag.DONT_BUNDLE.value());

protected static final short HEADER_ID = ClassConfigurator.getProtocolId(RequestCorrelator.class);

private ExecutorService executor = Executors.newFixedThreadPool(200);

@Property(name = "file", doc = "Configuration file for JGroups.")
protected String configFile;

@Property(doc = "Flags that will be used in all requests.", converter = StringArrayConverter.class)
protected String[] flags;

@Property(doc = "Transient flags that will be used in all requests.", converter = StringArrayConverter.class)
protected String[] transientFlags;

@Property(doc = "replicated.")
protected boolean replicated = true;

@Property(name = "nThreads", doc = "nThreads")
protected int nThreads = 200;

@Property(name = "nThreads", doc = "nThreads")
protected boolean sendResponse = false;

protected JChannel ch;
protected JGroupsReceiver receiver;
protected AtomicLong requestId = new AtomicLong(1);

@ProvidesTrait
public JGroups42Service getSelf() {
return this;
}

@ProvidesTrait
public BasicOperations createOperations() {
return new BasicOperations() {
@Override
public <K, V> Cache<K, V> getCache(String cacheName) {
return JGroups42Service.this;
}
};
}

@Override
public boolean isCoordinator() {
View view = ch.getView();
return view == null || view.getMembers() == null || view.getMembers().isEmpty()
|| ch.getAddress().equals(view.getMembers().get(0));
}

@Override
public Collection<Member> getMembers() {
if (receiver.getMembershipHistory().isEmpty()) return null;
return receiver.getMembershipHistory().get(receiver.getMembershipHistory().size() - 1).members;
}

@Override
public List<Membership> getMembershipHistory() {
return new ArrayList<>(receiver.getMembershipHistory());
}

@Override
public void start() {
executor = Executors.newFixedThreadPool(nThreads);
try {
ch = new JChannel(configFile);
receiver = new JGroups36Receiver(ch);
setReceiver();
ch.connect("x");
} catch (Exception e) {
throw new RuntimeException(e);
}
receiver.updateLocalAddr(ch.getAddress());
receiver.updateMyRank(Util.getRank(ch.getView(), ch.getAddress()) - 1);
}

@Override
public void stop() {
Util.close(ch);
synchronized (this) {
receiver.getMembershipHistory().add(Membership.empty());
}
}

@Override
protected RpcDispatcher createRpcDispatcher(JChannel ch) {
RpcDispatcher disp = new RpcDispatcher(ch, this);
disp.setMembershipListener(new ReceiverAdapter() {
public boolean isRunning() {
return ch != null && ch.isConnected();
}

@Override
public Object get(Object key) {
throw new UnsupportedOperationException("");
}

@Override
public boolean containsKey(Object key) {
throw new UnsupportedOperationException("");
}

@Override
public void put(Object key, Object value) {
if (replicated) {
if (nThreads > 1) {
executor.execute(() -> {
Map kv = new HashMap();
kv.put(key, value);
Message message = newMessage(kv);
sendMessage(message);
});
} else {
Map kv = new HashMap();
kv.put(key, value);
Message message = newMessage(kv);
sendMessage(message);
}
} else {
throw new UnsupportedOperationException("");
}
}

@Override
public Object getAndPut(Object key, Object value) {
throw new UnsupportedOperationException("");
}

@Override
public boolean remove(Object key) {
throw new UnsupportedOperationException("");
}

@Override
public Object getAndRemove(Object key) {
throw new UnsupportedOperationException("");
}

@Override
public void clear() {
throw new UnsupportedOperationException("");
}

// IncompatibleClassChangeError
protected void setReceiver() {
ch.setReceiver(new Receiver() {
@Override
public void viewAccepted(View view) {
JGroups42Service.this.receiver.viewAccepted(view);
public void receive(Message message) {

if (sendResponse && message.getSrc() != null && !message.getSrc().equals(ch.getAddress())) {
RequestCorrelator.Header header = message.getHeader(HEADER_ID);
if (header != null && header.requestId() > 0) {
Message response = new Message(message.getSrc()).setFlag(REPLY_FLAGS);
try {
ch.send(response);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
@Override
public void viewAccepted(View newView) {
receiver.viewAccepted(newView);
}
});
disp.setMethodLookup(id -> METHODS[id]);
return disp;
}

@Override
protected void connectChannel(String clusterName) throws Exception {
ch.connect(clusterName);
// IncompatibleClassChangeError
protected void sendMessage(Message message) {
try {
if (ch.isConnected()) {
ch.send(message);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

// IncompatibleClassChangeError
protected Message newMessage(Object object) {
Message message = new Message();
for (String flag : flags) {
message.setFlag(Message.Flag.valueOf(flag));
}
for (String transientFlag : transientFlags) {
message.setTransientFlag(Message.TransientFlag.valueOf(transientFlag));
}
Header header = new RequestCorrelator.Header((byte) 0, requestId.getAndIncrement(), (short) 0);
message.putHeader(HEADER_ID, header);
message.setObject(object);
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
mcast_recv_buf_size="25m"
ip_ttl="${jgroups.ip_ttl:2}"
thread_naming_pattern="pl"
enable_diagnostics="false"
enable_diagnostics="${jgroups.diag.enabled:false}"
bundler_type="transfer-queue"
max_bundle_size="8500"
max_bundle_size="${jgroups.bundler.max_size:64000}"

thread_pool.min_threads="${jgroups.thread_pool.min_threads:0}"
thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}"
Expand Down Expand Up @@ -57,5 +57,5 @@
<MFC max_credits="4m"
min_threshold="0.40"
/>
<FRAG3 frag_size="8000"/>
<FRAG3 frag_size="${jgroups.frag_size:60000}"/>
</config>
2 changes: 1 addition & 1 deletion plugins/jgroups52/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<dependency>
<groupId>org.jgroups</groupId>
<artifactId>jgroups</artifactId>
<version>5.2.4.Final</version>
<version>5.2.5.Final</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,66 @@
package org.radargun.service;

import org.jgroups.JChannel;
import org.jgroups.BytesMessage;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.View;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.blocks.RequestCorrelator;
import org.radargun.Service;

@Service(doc = "JGroupsService faking cache operations")
public class JGroups52Service extends JGroups36Service {
public class JGroups52Service extends JGroups42Service {

@Override
protected RpcDispatcher createRpcDispatcher(JChannel ch) {
RpcDispatcher disp = new RpcDispatcher(ch, this);
disp.setMethodLookup(id -> METHODS[id]);
disp.setReceiver(new Receiver() {
private static final short REPLY_FLAGS =
(short) (Message.Flag.NO_FC.value() | Message.Flag.OOB.value() |
Message.Flag.NO_TOTAL_ORDER.value());

protected void setReceiver() {
ch.setReceiver(new Receiver() {
@Override
public void receive(Message message) {
if (sendResponse && message.getSrc() != null && !message.getSrc().equals(ch.getAddress())) {
RequestCorrelator.Header header = message.getHeader(HEADER_ID);
if (header != null && header.requestId() > 0) {
Message response = new BytesMessage(message.getSrc()).setFlag(REPLY_FLAGS, false);
try {
ch.send(response);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
@Override
public void viewAccepted(View newView) {
JGroups52Service.this.receiver.viewAccepted(newView);
receiver.viewAccepted(newView);
}
});
return disp;
}

@Override
protected void connectChannel(String clusterName) throws Exception {
ch.connect(clusterName);
protected void sendMessage(Message message) {
try {
if (ch.isConnected()) {
ch.send(message);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
protected Message newMessage(Object object) {
Message message = new BytesMessage();
for (String flag : flags) {
message.setFlag(Message.Flag.valueOf(flag));
}
for (String transientFlag : transientFlags) {
message.setFlag(Message.TransientFlag.valueOf(transientFlag));
}
Header header = new RequestCorrelator.Header((byte) 0, requestId.getAndIncrement(), (short) 0);
message.putHeader(HEADER_ID, header);
message.setObject(object);
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
mcast_recv_buf_size="25m"
ip_ttl="${jgroups.ip_ttl:2}"
thread_naming_pattern="pl"
diag.enabled="false"
diag.enabled="${jgroups.diag.enabled:false}"
bundler_type="transfer-queue"
bundler.max_size="8500"
bundler.max_size="${jgroups.bundler.max_size:64000}"

thread_pool.min_threads="${jgroups.thread_pool.min_threads:0}"
thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}"
Expand Down Expand Up @@ -57,5 +57,5 @@
<MFC max_credits="4m"
min_threshold="0.40"
/>
<FRAG4 frag_size="8000"/>
</config>
<FRAG4 frag_size="${jgroups.frag_size:60000}"/>
</config>
Loading