Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to configure transactional amqp consumers from the gui. #38

Merged
merged 1 commit into from
Feb 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package com.zeroclue.jmeter.protocol.amqp;

import java.io.IOException;
import java.security.*;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.jmeter.samplers.Entry;
import org.apache.jmeter.samplers.Interruptible;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.testelement.TestStateListener;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;

public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStateListener {
private static final int DEFAULT_PREFETCH_COUNT = 0; // unlimited
Expand All @@ -32,6 +32,9 @@ public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStat
private static final String AUTO_ACK = "AMQPConsumer.AutoAck";
private static final String RECEIVE_TIMEOUT = "AMQPConsumer.ReceiveTimeout";

public static boolean DEFAULT_USE_TX = false;
private final static String USE_TX = "AMQPConsumer.UseTx";

private transient Channel channel;
private transient QueueingConsumer consumer;
private transient String consumerTag;
Expand Down Expand Up @@ -104,6 +107,11 @@ public SampleResult sample(Entry entry) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

// commit the sample.
if (getUseTx()) {
channel.txCommit();
}

result.setResponseData("OK", null);
result.setDataType(SampleResult.TEXT);

Expand Down Expand Up @@ -222,6 +230,14 @@ public int getPrefetchCountAsInt() {
return getPropertyAsInt(PREFETCH_COUNT);
}

public Boolean getUseTx() {
return getPropertyAsBoolean(USE_TX, DEFAULT_USE_TX);
}

public void setUseTx(Boolean tx) {
setProperty(USE_TX, tx);
}

/**
* set whether the sampler should read the response or not
*
Expand Down Expand Up @@ -315,6 +331,9 @@ private void trace(String s) {
protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException {
boolean ret = super.initChannel();
channel.basicQos(getPrefetchCountAsInt());
if (getUseTx()) {
channel.txSelect();
}
return ret;
}
}
16 changes: 8 additions & 8 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package com.zeroclue.jmeter.protocol.amqp;

import com.rabbitmq.client.AMQP;

import java.io.IOException;
import java.security.*;
import java.util.*;

import com.rabbitmq.client.Channel;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.samplers.Entry;
Expand All @@ -15,7 +11,11 @@
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;

/**
* JMeter creates an instance of a sampler class for every occurrence of the
Expand Down Expand Up @@ -44,10 +44,10 @@ public class AMQPPublisher extends AMQPSampler implements Interruptible {
private final static String HEADERS = "AMQPPublisher.Headers";

public static boolean DEFAULT_PERSISTENT = false;
private final static String PERSISTENT = "AMQPConsumer.Persistent";
private final static String PERSISTENT = "AMQPPublisher.Persistent";

public static boolean DEFAULT_USE_TX = false;
private final static String USE_TX = "AMQPConsumer.UseTx";
private final static String USE_TX = "AMQPPublisher.UseTx";

private transient Channel channel;

Expand Down
21 changes: 11 additions & 10 deletions src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package com.zeroclue.jmeter.protocol.amqp.gui;

import javax.swing.JCheckBox;
import javax.swing.JPanel;

import com.zeroclue.jmeter.protocol.amqp.AMQPConsumer;
import org.apache.jmeter.testelement.TestElement;
import org.apache.jorphan.gui.JLabeledTextField;

import com.zeroclue.jmeter.protocol.amqp.AMQPConsumer;

import javax.swing.*;
import java.awt.*;


Expand All @@ -17,9 +14,11 @@ public class AMQPConsumerGui extends AMQPSamplerGui {

protected JLabeledTextField receiveTimeout = new JLabeledTextField("Receive Timeout");
protected JLabeledTextField prefetchCount = new JLabeledTextField("Prefetch Count");

private final JCheckBox purgeQueue = new JCheckBox("Purge Queue", false);
private final JCheckBox autoAck = new JCheckBox("Auto ACK", true);
private final JCheckBox readResponse = new JCheckBox("Read Response", AMQPConsumer.DEFAULT_READ_RESPONSE);
private final JCheckBox useTx = new JCheckBox("Use Transactions?", AMQPConsumer.DEFAULT_USE_TX);

private JPanel mainPanel;

Expand All @@ -32,15 +31,15 @@ public AMQPConsumerGui(){
*/
protected void init() {
super.init();

mainPanel.add(readResponse);

prefetchCount.setPreferredSize(new Dimension(100,25));
mainPanel.add(prefetchCount);
useTx.setPreferredSize(new Dimension(100,25));

mainPanel.add(receiveTimeout);
mainPanel.add(prefetchCount);
mainPanel.add(purgeQueue);
mainPanel.add(autoAck);
mainPanel.add(readResponse);
mainPanel.add(useTx);
}

@Override
Expand All @@ -62,6 +61,7 @@ public void configure(TestElement element) {
receiveTimeout.setText(sampler.getReceiveTimeout());
purgeQueue.setSelected(sampler.purgeQueue());
autoAck.setSelected(sampler.autoAck());
useTx.setSelected(sampler.getUseTx());
}

/**
Expand All @@ -75,6 +75,7 @@ public void clearGui() {
receiveTimeout.setText("");
purgeQueue.setSelected(false);
autoAck.setSelected(true);
useTx.setSelected(AMQPConsumer.DEFAULT_USE_TX);
}

/**
Expand Down Expand Up @@ -104,7 +105,7 @@ public void modifyTestElement(TestElement te) {
sampler.setReceiveTimeout(receiveTimeout.getText());
sampler.setPurgeQueue(purgeQueue.isSelected());
sampler.setAutoAck(autoAck.isSelected());

sampler.setUseTx(useTx.isSelected());
}

/**
Expand Down