From c983b1ea342498dd4e77e94901560e1dd5a658cc Mon Sep 17 00:00:00 2001 From: yabinmeng Date: Wed, 1 May 2024 19:04:16 -0500 Subject: [PATCH 1/3] Add support for JMS priority --- nb-adapters/adapter-s4j/pom.xml | 2 +- .../io/nosqlbench/adapter/s4j/S4JSpace.java | 68 +++++++++---------- .../MessageProducerOpDispenser.java | 7 ++ .../s4j/dispensers/S4JBaseOpDispenser.java | 2 +- .../adapter/s4j/util/S4JAdapterUtil.java | 24 +++++++ .../s4j/util/S4JClientConfConverter.java | 40 +++++------ .../s4j/util/S4JJMSContextWrapper.java | 4 +- .../scenarios/pulsar_s4j_producer.yaml | 7 ++ 8 files changed, 93 insertions(+), 61 deletions(-) diff --git a/nb-adapters/adapter-s4j/pom.xml b/nb-adapters/adapter-s4j/pom.xml index b3fec03254..3c154f8164 100644 --- a/nb-adapters/adapter-s4j/pom.xml +++ b/nb-adapters/adapter-s4j/pom.xml @@ -37,7 +37,7 @@ - 4.0.1 + 4.1.2-alpha diff --git a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JSpace.java b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JSpace.java index 181bdf5c8c..d9153ea012 100644 --- a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JSpace.java +++ b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JSpace.java @@ -99,8 +99,9 @@ public record JMSDestinationCacheKey(String contextIdentifier, // Keep track the transaction count per thread private final ThreadLocal txnBatchTrackingCnt = ThreadLocal.withInitial(() -> 0); - // Represents the JMS connection - private PulsarConnectionFactory s4jConnFactory; + // Instead of using one "physical" connection per NB process, + // allows creating multiple connections to the Pulsar broker via the "num_conn" parameter + private final ConcurrentHashMap connFactories = new ConcurrentHashMap<>(); private long totalCycleNum; @@ -125,6 +126,7 @@ public S4JSpace(String spaceName, NBConfiguration cfg) { this.sessionMode = S4JAdapterUtil.getSessionModeFromStr( cfg.getOptional("session_mode").orElse("")); this.s4JClientConf = new S4JClientConf(webSvcUrl, pulsarSvcUrl, s4jClientConfFileName); + logger.info("{}", s4JClientConf.toString()); this.setS4JActivityStartTimeMills(System.currentTimeMillis()); @@ -217,45 +219,41 @@ public void incTxnBatchTrackingCnt() { public long incTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.incrementAndGet(); } - public PulsarConnectionFactory getS4jConnFactory() { return s4jConnFactory; } - public long getTotalCycleNum() { return totalCycleNum; } public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; } public void initializeSpace(S4JClientConf s4JClientConnInfo) { - if (s4jConnFactory == null) { - Map cfgMap; - try { - cfgMap = s4JClientConnInfo.getS4jConfObjMap(); - s4jConnFactory = new PulsarConnectionFactory(cfgMap); - - for (int i=0; i { - if (logger.isDebugEnabled()) { - logger.error("onException::Unexpected JMS error happened:" + e); - } - }); - - connLvlJmsContexts.put(connLvlJmsConnContextIdStr, jmsConnContext); + Map cfgMap; + try { + cfgMap = s4JClientConnInfo.getS4jConfObjMap(); + + for (int i=0; i new PulsarConnectionFactory(cfgMap)); + + JMSContext jmsConnContext = getOrCreateConnLvlJMSContext(s4jConnFactory, s4JClientConnInfo, sessionMode); + jmsConnContext.setClientID(connLvlJmsConnContextIdStr); + jmsConnContext.setExceptionListener(e -> { if (logger.isDebugEnabled()) { - logger.debug("[Connection level JMSContext] {} -- {}", - Thread.currentThread().getName(), - jmsConnContext ); + logger.error("onException::Unexpected JMS error happened:" + e); } + }); + + connLvlJmsContexts.put(connLvlJmsConnContextIdStr, jmsConnContext); + if (logger.isDebugEnabled()) { + logger.debug("[Connection level JMSContext] {} -- {}", + Thread.currentThread().getName(), + jmsConnContext ); } } - catch (JMSRuntimeException e) { - logger.error("Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString()); - throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause()); - } - catch (Exception e) { - e.printStackTrace(); - } + } + catch (JMSRuntimeException e) { + logger.error("Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString()); + throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause()); } } @@ -284,7 +282,9 @@ public void shutdownSpace() { if (jmsContext != null) jmsContext.close(); } - s4jConnFactory.close(); + for (PulsarConnectionFactory s4jConnFactory : connFactories.values()) { + if (s4jConnFactory != null) s4jConnFactory.close(); + } } catch (Exception ex) { String exp = "Unexpected error when shutting down the S4J adaptor space"; diff --git a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java index 584ebca040..2dac6c52f8 100644 --- a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java +++ b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java @@ -40,11 +40,13 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser { private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser"); public static final String MSG_HEADER_OP_PARAM = "msg_header"; + public static final String MSG_PRIORITY_OP_PARAM = "msg_priority"; public static final String MSG_PROP_OP_PARAM = "msg_property"; public static final String MSG_BODY_OP_PARAM = "msg_body"; public static final String MSG_TYPE_OP_PARAM = "msg_type"; private final LongFunction msgHeaderRawJsonStrFunc; + private final LongFunction msgPriorityStrFunc; private final LongFunction msgPropRawJsonStrFunc; private final LongFunction msgBodyRawJsonStrFunc; private final LongFunction msgTypeFunc; @@ -56,6 +58,7 @@ public MessageProducerOpDispenser(DriverAdapter adapter, super(adapter, op, tgtNameFunc, s4jSpace); this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM); + this.msgPriorityStrFunc = lookupOptionalStrOpValueFunc(MSG_PRIORITY_OP_PARAM); this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM); this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM); this.msgTypeFunc = lookupOptionalStrOpValueFunc(MSG_TYPE_OP_PARAM); @@ -272,6 +275,7 @@ else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYP public MessageProducerOp getOp(long cycle) { String destName = destNameStrFunc.apply(cycle); String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle); + String jmsMsgPriorityStr = msgPriorityStrFunc.apply(cycle); String jmsMsgPropertyRawJsonStr = msgPropRawJsonStrFunc.apply(cycle); String jmsMsgBodyRawJsonStr = msgBodyRawJsonStrFunc.apply(cycle); @@ -294,6 +298,9 @@ public MessageProducerOp getOp(long cycle) { JMSProducer producer; try { producer = getJmsProducer(s4JJMSContextWrapper, asyncAPI); + int priority = NumberUtils.toInt(jmsMsgPriorityStr); + assert (priority >= 0 && priority <= 9); + producer.setPriority(priority); } catch (JMSException jmsException) { throw new S4JAdapterUnexpectedException("Unable to create the JMS producer!"); diff --git a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/S4JBaseOpDispenser.java b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/S4JBaseOpDispenser.java index 5150fe3293..d1e1176e68 100644 --- a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/S4JBaseOpDispenser.java +++ b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/S4JBaseOpDispenser.java @@ -225,7 +225,7 @@ public Destination getJmsDestination( String destType, String destName) throws JMSRuntimeException { - String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifer(); + String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifier(); JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext(); S4JSpace.JMSDestinationCacheKey destinationCacheKey = diff --git a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JAdapterUtil.java b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JAdapterUtil.java index a2fa7b1e43..a55f3983ee 100644 --- a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JAdapterUtil.java +++ b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JAdapterUtil.java @@ -205,6 +205,30 @@ public static String getValidJmsMsgPropTypeList() { return StringUtils.join(JMS_MESSAGE_TYPES.LABELS, ", "); } + // Message compression types + public enum MSG_COMPRESSION_TYPE_STR { + LZ4("LZ4"), + ZSTD("ZSTD"), + ZLIB("ZLIB"), + SNAPPY("SNAPPY"); + public final String label; + MSG_COMPRESSION_TYPE_STR(String label) { + this.label = label; + } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label) + .collect(Collectors.toUnmodifiableSet()); + private static boolean isValidLabel(String label) { + return LABELS.contains(StringUtils.upperCase(label)); + } + } + public static String getValidMsgCompressionTypeList() { + return StringUtils.join(MSG_COMPRESSION_TYPE_STR.LABELS, ", "); + } + public static boolean isValidMsgCompressionTypeStr(String type) { + return MSG_COMPRESSION_TYPE_STR.isValidLabel(type); + } + /////// // Convert JSON string to a key/value map public static Map convertJsonToMap(String jsonStr) throws Exception { diff --git a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConfConverter.java b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConfConverter.java index 9ba86bfb34..7b4e0adfce 100644 --- a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConfConverter.java +++ b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConfConverter.java @@ -71,34 +71,26 @@ public static Map convertRawProducerConf(Map pul /** * Non-primitive type processing for Pulsar producer configuration items */ - // "compressionType" has value type "CompressionType" - // - expecting the following values: 'LZ4', 'ZLIB', 'ZSTD', 'SNAPPY' String confKeyName = "compressionType"; String confVal = pulsarProducerConfMapRaw.get(confKeyName); - String expectedVal = "(LZ4|ZLIB|ZSTD|SNAPPY)"; - if (StringUtils.isNotBlank(confVal)) { - if (StringUtils.equalsAnyIgnoreCase(confVal, "LZ4", "ZLIB", "ZSTD", "SNAPPY")) { - CompressionType compressionType = CompressionType.NONE; - - switch (StringUtils.upperCase(confVal)) { - case "LZ4": - compressionType = CompressionType.LZ4; - case "ZLIB": - compressionType = CompressionType.ZLIB; - case "ZSTD": - compressionType = CompressionType.ZSTD; - case "SNAPPY": - compressionType = CompressionType.SNAPPY; - } - - s4jProducerConfObjMap.put(confKeyName, compressionType); - } else { - throw new S4JAdapterInvalidParamException( - getInvalidConfValStr(confKeyName, confVal, "producer", expectedVal)); + CompressionType compressionType = CompressionType.NONE; + if ( StringUtils.isNotBlank(confVal) ) { + try { + S4JAdapterUtil.MSG_COMPRESSION_TYPE_STR compressionTypeStr = + S4JAdapterUtil.MSG_COMPRESSION_TYPE_STR.valueOf(confVal); + compressionType = switch (compressionTypeStr) { + case LZ4 -> CompressionType.LZ4; + case ZLIB -> CompressionType.ZLIB; + case ZSTD -> CompressionType.ZSTD; + case SNAPPY -> CompressionType.SNAPPY; + }; + } catch (IllegalArgumentException e) { + // Any invalid value will be treated as no compression } } + s4jProducerConfObjMap.put(confKeyName, compressionType); // TODO: Skip the following Pulsar configuration items for now because they're not really // needed in the NB S4J testing at the moment. Add support for them when needed. // * messageRoutingMode @@ -312,7 +304,9 @@ public static Map convertRawConsumerConf(Map pul Map.entry("jms.usePulsarAdmin","boolean"), Map.entry("jms.useServerSideFiltering","boolean"), Map.entry("jms.waitForServerStartupTimeout","int"), - Map.entry("jms.transactionsStickyPartitions", "boolean") + Map.entry("jms.transactionsStickyPartitions", "boolean"), + Map.entry("jms.enableJMSPriority","boolean"), + Map.entry("jms.priorityMapping","String") ); public static Map convertRawJmsConf(Map s4jJmsConfMapRaw) { Map s4jJmsConfObjMap = new HashMap<>(); diff --git a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java index f22b6d78bf..904cff83eb 100644 --- a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java +++ b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java @@ -34,7 +34,7 @@ public S4JJMSContextWrapper(String identifer, JMSContext jmsContext) { public int getJmsSessionMode() { return jmsSessionMode; } public boolean isTransactedMode() { return Session.SESSION_TRANSACTED == this.getJmsSessionMode(); } - public String getJmsContextIdentifer() { return jmsContextIdentifer; } + public String getJmsContextIdentifier() { return jmsContextIdentifer; } public JMSContext getJmsContext() { return jmsContext; } public void close() { @@ -45,7 +45,7 @@ public void close() { public String toString() { return new ToStringBuilder(this). - append("jmsContextIdentifer", jmsContextIdentifer). + append("jmsContextIdentifier", jmsContextIdentifer). append("jmsContext", jmsContext.toString()). toString(); } diff --git a/nb-adapters/adapter-s4j/src/main/resources/scenarios/pulsar_s4j_producer.yaml b/nb-adapters/adapter-s4j/src/main/resources/scenarios/pulsar_s4j_producer.yaml index 260192c694..b9c37eb1ad 100644 --- a/nb-adapters/adapter-s4j/src/main/resources/scenarios/pulsar_s4j_producer.yaml +++ b/nb-adapters/adapter-s4j/src/main/resources/scenarios/pulsar_s4j_producer.yaml @@ -5,6 +5,7 @@ bindings: mymap_val1: AlphaNumericString(10) mymap_val2: AlphaNumericString(20) mystream_val1: AlphaNumericString(50) + my_priority: WeightedLongs('2:20;4:70;8:10') # document level parameters that apply to all Pulsar client types: params: @@ -25,6 +26,12 @@ blocks: "JMSPriority": "9" } + ## (Optional) S4J Message priority emulation (since Pulsar doesn't have native message priority) + # - jms.enableJMSPriority must be set to true in S4J configuration; + # otherwise, the priority value will be ignored. + # - If this is set, the "JMSPriority" value in the header will be ignored. + msg_priority: "{my_priority}" + ## (Optional) JMS properties, predefined or customized (in JSON format). msg_property: | { From 0c74a442e5fe4c220e85d6791761120141abf0f9 Mon Sep 17 00:00:00 2001 From: yabinmeng Date: Wed, 1 May 2024 19:23:26 -0500 Subject: [PATCH 2/3] fix typo --- .../nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java index 904cff83eb..7219dce3b4 100644 --- a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java +++ b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java @@ -22,19 +22,19 @@ * under the License. */ public class S4JJMSContextWrapper { - private final String jmsContextIdentifer; + private final String jmsContextIdentifier; private final JMSContext jmsContext; private final int jmsSessionMode; public S4JJMSContextWrapper(String identifer, JMSContext jmsContext) { - this.jmsContextIdentifer = identifer; + this.jmsContextIdentifier = identifer; this.jmsContext = jmsContext; this.jmsSessionMode = jmsContext.getSessionMode(); } public int getJmsSessionMode() { return jmsSessionMode; } public boolean isTransactedMode() { return Session.SESSION_TRANSACTED == this.getJmsSessionMode(); } - public String getJmsContextIdentifier() { return jmsContextIdentifer; } + public String getJmsContextIdentifier() { return jmsContextIdentifier; } public JMSContext getJmsContext() { return jmsContext; } public void close() { @@ -45,7 +45,7 @@ public void close() { public String toString() { return new ToStringBuilder(this). - append("jmsContextIdentifier", jmsContextIdentifer). + append("jmsContextIdentifier", jmsContextIdentifier). append("jmsContext", jmsContext.toString()). toString(); } From 1025b1ddc2a73b54274d8cd07fbb8e9c6f113d8d Mon Sep 17 00:00:00 2001 From: yabinmeng Date: Wed, 1 May 2024 20:24:51 -0500 Subject: [PATCH 3/3] Add a warning message for invalid input compression type string --- .../adapter/s4j/util/S4JClientConfConverter.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConfConverter.java b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConfConverter.java index 7b4e0adfce..f9908c527d 100644 --- a/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConfConverter.java +++ b/nb-adapters/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConfConverter.java @@ -22,6 +22,8 @@ import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.HashMap; @@ -35,6 +37,8 @@ */ public class S4JClientConfConverter { + private final static Logger logger = LogManager.getLogger(S4JClientConfConverter.class); + public static Map convertRawClientConf(Map pulsarClientConfMapRaw) { Map s4jClientConfObjMap = new HashMap<>(); s4jClientConfObjMap.putAll(pulsarClientConfMapRaw); @@ -87,6 +91,11 @@ public static Map convertRawProducerConf(Map pul }; } catch (IllegalArgumentException e) { // Any invalid value will be treated as no compression + logger.warn("Invalid producer \"compressionType\" value ({}) in the config properties file. " + + "Expecting one of the following values: {}. " + + "No message compression will be applied (for producers).", + confVal, + S4JAdapterUtil.getValidMsgCompressionTypeList()); } }