Skip to content

Commit

Permalink
Merge pull request #1946 from yabinmeng/main
Browse files Browse the repository at this point in the history
Add support for reading a very large payload file (as message body) directly using S4J adapter
  • Loading branch information
jshook committed May 17, 2024
2 parents 54b185a + d1f09b2 commit 1d9d50a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 16 deletions.
4 changes: 2 additions & 2 deletions nb-adapters/adapter-s4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
<artifactId>commons-beanutils-core</artifactId>
<version>1.8.3</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.config.standard.Param;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.jms.*;
import java.io.File;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -105,6 +109,9 @@ public record JMSDestinationCacheKey(String contextIdentifier,

private long totalCycleNum;

// Large message payload simulation
private final MutablePair<Boolean, String> largePayloadSimPair = MutablePair.of(false, null);


public S4JSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
Expand All @@ -128,6 +135,29 @@ public S4JSpace(String spaceName, NBConfiguration cfg) {
this.s4JClientConf = new S4JClientConf(webSvcUrl, pulsarSvcUrl, s4jClientConfFileName);
logger.info("{}", s4JClientConf.toString());

boolean simulateLargePayloadEnabled =
BooleanUtils.toBoolean(cfg.getOptional("simulate_large_payload").orElse("false"));
String simulatedPayloadFile = cfg.getOptional("simulated_payload_file").orElse(null);
if (simulateLargePayloadEnabled &&
(simulatedPayloadFile == null || ! new File(simulatedPayloadFile).exists()) ) {
throw new S4JAdapterInvalidParamException(
"When 'simulate_large_payload' is enabled, 'simulated_payload_file' must be provided and the file must exist.");
}

// Read the large payload file content and store it in the largePayloadSimPair
if (simulateLargePayloadEnabled) {
this.largePayloadSimPair.setLeft(true);
try {
String payloadContent = FileUtils.readFileToString(new File(simulatedPayloadFile), "UTF-8");
this.largePayloadSimPair.setRight(payloadContent);
} catch (Exception ex) {
throw new S4JAdapterUnexpectedException(
"Unable to read the simulated large payload file: " + simulatedPayloadFile);
}
}
logger.info("Simulated large payload enabled: {}, payload file: {}",
simulateLargePayloadEnabled, simulatedPayloadFile);

this.setS4JActivityStartTimeMills(System.currentTimeMillis());

this.initializeSpace(s4JClientConf);
Expand Down Expand Up @@ -158,6 +188,10 @@ public static NBConfigModel getConfigModel() {
.setDescription("JMS session mode"))
.add(Param.defaultTo("strict_msg_error_handling", false)
.setDescription("Whether to do strict error handling which is to stop NB S4J execution."))
.add(Param.defaultTo("simulate_large_payload", false)
.setDescription("Whether to simulate large message payload."))
.add(Param.optional("simulated_payload_file").
setDescription("File path to the simulated large message payload."))
.asReadOnly();
}

Expand Down Expand Up @@ -222,6 +256,9 @@ public void incTxnBatchTrackingCnt() {
public long getTotalCycleNum() { return totalCycleNum; }
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }


public Pair<Boolean, String> getLargePayloadSimPair() { return largePayloadSimPair; }

public void initializeSpace(S4JClientConf s4JClientConnInfo) {
Map<String, Object> cfgMap;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -48,7 +49,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
private final LongFunction<String> msgHeaderRawJsonStrFunc;
private final LongFunction<String> msgPriorityStrFunc;
private final LongFunction<String> msgPropRawJsonStrFunc;
private final LongFunction<String> msgBodyRawJsonStrFunc;
private final LongFunction<String> msgBodyRawStrFunc;
private final LongFunction<String> msgTypeFunc;

public MessageProducerOpDispenser(DriverAdapter adapter,
Expand All @@ -60,13 +61,13 @@ public MessageProducerOpDispenser(DriverAdapter adapter,
this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
this.msgPriorityStrFunc = lookupOptionalStrOpValueFunc(MSG_PRIORITY_OP_PARAM);
this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
this.msgBodyRawStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
this.msgTypeFunc = lookupOptionalStrOpValueFunc(MSG_TYPE_OP_PARAM);
}

private Message createAndSetMessagePayload(
S4JJMSContextWrapper s4JJMSContextWrapper,
String msgType, String msgBodyRawJsonStr) throws JMSException
String msgType, String msgPayload) throws JMSException
{
Message message;
int messageSize = 0;
Expand All @@ -75,16 +76,16 @@ private Message createAndSetMessagePayload(

if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.TEXT.label)) {
message = jmsContext.createTextMessage();
((TextMessage) message).setText(msgBodyRawJsonStr);
messageSize = msgBodyRawJsonStr.length();
((TextMessage) message).setText(msgPayload);
messageSize = msgPayload.length();
} else if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.MAP.label)) {
message = jmsContext.createMapMessage();

// The message body json string must be in the format of a collection of key/value pairs
// Otherwise, it is an error
Map<String, String> jmsMsgBodyMap;
try {
jmsMsgBodyMap = S4JAdapterUtil.convertJsonToMap(msgBodyRawJsonStr);
jmsMsgBodyMap = S4JAdapterUtil.convertJsonToMap(msgPayload);
} catch (Exception e) {
throw new RuntimeException("The specified message payload can't be converted to a map when requiring a 'Map' message type!");
}
Expand All @@ -102,7 +103,7 @@ private Message createAndSetMessagePayload(
// Otherwise, it is an error
List<Object> jmsMsgBodyObjList;
try {
jmsMsgBodyObjList = S4JAdapterUtil.convertJsonToObjList(msgBodyRawJsonStr);
jmsMsgBodyObjList = S4JAdapterUtil.convertJsonToObjList(msgPayload);
} catch (Exception e) {
throw new RuntimeException("The specified message payload can't be converted to a list of Objects when requiring a 'Stream' message type!");
}
Expand All @@ -113,13 +114,13 @@ private Message createAndSetMessagePayload(
}
} else if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.OBJECT.label)) {
message = jmsContext.createObjectMessage();
((ObjectMessage) message).setObject(msgBodyRawJsonStr);
messageSize += msgBodyRawJsonStr.getBytes().length;
((ObjectMessage) message).setObject(msgPayload);
messageSize += msgPayload.getBytes().length;
}
// default: BYTE message type
else {
message = jmsContext.createBytesMessage();
byte[] msgBytePayload = msgBodyRawJsonStr.getBytes();
byte[] msgBytePayload = msgPayload.getBytes();
((BytesMessage)message).writeBytes(msgBytePayload);
messageSize += msgBytePayload.length;
}
Expand Down Expand Up @@ -277,9 +278,18 @@ public MessageProducerOp getOp(long cycle) {
String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle);
String jmsMsgPriorityStr = msgPriorityStrFunc.apply(cycle);
String jmsMsgPropertyRawJsonStr = msgPropRawJsonStrFunc.apply(cycle);
String jmsMsgBodyRawJsonStr = msgBodyRawJsonStrFunc.apply(cycle);

if (StringUtils.isBlank(jmsMsgBodyRawJsonStr)) {
// If 'simulate_large_payload' is enabled, replace the actual message payload with a static
// large payload that is read when the adapter is initialized
String effectiveMsgBody;
Pair<Boolean, String> largePayloadSimPair = s4jSpace.getLargePayloadSimPair();
if (largePayloadSimPair.getLeft()) {
effectiveMsgBody = largePayloadSimPair.getRight();
}
else {
effectiveMsgBody = msgBodyRawStrFunc.apply(cycle);
}
if (StringUtils.isBlank(effectiveMsgBody)) {
throw new S4JAdapterInvalidParamException("Message payload must be specified and can't be empty!");
}

Expand Down Expand Up @@ -322,7 +332,7 @@ public MessageProducerOp getOp(long cycle) {
//
Message message;
try {
message = createAndSetMessagePayload(s4JJMSContextWrapper, jmsMsgType, jmsMsgBodyRawJsonStr);
message = createAndSetMessagePayload(s4JJMSContextWrapper, jmsMsgType, effectiveMsgBody);
}
catch (JMSException jmsException) {
throw new RuntimeException("Failed to set create a JMS message and set its payload!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected S4JBaseOpDispenser(DriverAdapter adapter,
this.asyncAPI =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.ASYNC_API.label, Boolean.TRUE);
this.txnBatchNum =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.TXN_BATCH_NUM.label, Integer.valueOf(0));
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.TXN_BATCH_NUM.label, 0);

this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticConfig("threads", String.class));
this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticConfig("cycles", String.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ blocks:
msg_type: "text"

## (Mandatory) JMS message body. Value depends on msg_type.
# NOTE: using NB binding variable to generate the message body may be ignored
# if input CLI parameter 'simulate_large_payload' is set to true. In this case,
# 'simulated_payload_file' must be set to a valid file path. and all messages will
# have the same payload content as read from the file.
msg_body: "{mytext_val}"

# # example of having "map" as the message type
Expand Down

0 comments on commit 1d9d50a

Please sign in to comment.