Skip to content

Commit

Permalink
Merge pull request #1937 from yabinmeng/main
Browse files Browse the repository at this point in the history
Add support for JMS priority in S4J adapater
  • Loading branch information
jshook committed May 2, 2024
2 parents 4500d05 + 1025b1d commit b34d42c
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 63 deletions.
2 changes: 1 addition & 1 deletion nb-adapters/adapter-s4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
</description>

<properties>
<s4j.version>4.0.1</s4j.version>
<s4j.version>4.1.2-alpha</s4j.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ public record JMSDestinationCacheKey(String contextIdentifier,
// Keep track the transaction count per thread
private final ThreadLocal<Integer> txnBatchTrackingCnt = ThreadLocal.withInitial(() -> 0);

// Represents the JMS connection
private PulsarConnectionFactory s4jConnFactory;
// Instead of using one "physical" connection per NB process,
// allows creating multiple connections to the Pulsar broker via the "num_conn" parameter
private final ConcurrentHashMap<String, PulsarConnectionFactory> connFactories = new ConcurrentHashMap<>();

private long totalCycleNum;

Expand All @@ -125,6 +126,7 @@ public S4JSpace(String spaceName, NBConfiguration cfg) {
this.sessionMode = S4JAdapterUtil.getSessionModeFromStr(
cfg.getOptional("session_mode").orElse(""));
this.s4JClientConf = new S4JClientConf(webSvcUrl, pulsarSvcUrl, s4jClientConfFileName);
logger.info("{}", s4JClientConf.toString());

this.setS4JActivityStartTimeMills(System.currentTimeMillis());

Expand Down Expand Up @@ -217,45 +219,41 @@ public void incTxnBatchTrackingCnt() {

public long incTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.incrementAndGet(); }

public PulsarConnectionFactory getS4jConnFactory() { return s4jConnFactory; }

public long getTotalCycleNum() { return totalCycleNum; }
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }

public void initializeSpace(S4JClientConf s4JClientConnInfo) {
if (s4jConnFactory == null) {
Map<String, Object> cfgMap;
try {
cfgMap = s4JClientConnInfo.getS4jConfObjMap();
s4jConnFactory = new PulsarConnectionFactory(cfgMap);

for (int i=0; i<getMaxNumConn(); i++) {
// Establish a JMS connection
String connLvlJmsConnContextIdStr =getConnLvlJmsContextIdentifier(i);

JMSContext jmsConnContext = getOrCreateConnLvlJMSContext(s4jConnFactory, s4JClientConnInfo, sessionMode);
jmsConnContext.setClientID(connLvlJmsConnContextIdStr);
jmsConnContext.setExceptionListener(e -> {
if (logger.isDebugEnabled()) {
logger.error("onException::Unexpected JMS error happened:" + e);
}
});

connLvlJmsContexts.put(connLvlJmsConnContextIdStr, jmsConnContext);
Map<String, Object> cfgMap;
try {
cfgMap = s4JClientConnInfo.getS4jConfObjMap();

for (int i=0; i<getMaxNumConn(); i++) {
String connLvlJmsConnContextIdStr =getConnLvlJmsContextIdentifier(i);

// Establish a JMS connection
PulsarConnectionFactory s4jConnFactory = connFactories.computeIfAbsent(
connLvlJmsConnContextIdStr,
__ -> new PulsarConnectionFactory(cfgMap));

JMSContext jmsConnContext = getOrCreateConnLvlJMSContext(s4jConnFactory, s4JClientConnInfo, sessionMode);
jmsConnContext.setClientID(connLvlJmsConnContextIdStr);
jmsConnContext.setExceptionListener(e -> {
if (logger.isDebugEnabled()) {
logger.debug("[Connection level JMSContext] {} -- {}",
Thread.currentThread().getName(),
jmsConnContext );
logger.error("onException::Unexpected JMS error happened:" + e);
}
});

connLvlJmsContexts.put(connLvlJmsConnContextIdStr, jmsConnContext);
if (logger.isDebugEnabled()) {
logger.debug("[Connection level JMSContext] {} -- {}",
Thread.currentThread().getName(),
jmsConnContext );
}
}
catch (JMSRuntimeException e) {
logger.error("Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString());
throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause());
}
catch (Exception e) {
e.printStackTrace();
}
}
catch (JMSRuntimeException e) {
logger.error("Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString());
throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause());
}
}

Expand Down Expand Up @@ -284,7 +282,9 @@ public void shutdownSpace() {
if (jmsContext != null) jmsContext.close();
}

s4jConnFactory.close();
for (PulsarConnectionFactory s4jConnFactory : connFactories.values()) {
if (s4jConnFactory != null) s4jConnFactory.close();
}
}
catch (Exception ex) {
String exp = "Unexpected error when shutting down the S4J adaptor space";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser");

public static final String MSG_HEADER_OP_PARAM = "msg_header";
public static final String MSG_PRIORITY_OP_PARAM = "msg_priority";
public static final String MSG_PROP_OP_PARAM = "msg_property";
public static final String MSG_BODY_OP_PARAM = "msg_body";
public static final String MSG_TYPE_OP_PARAM = "msg_type";

private final LongFunction<String> msgHeaderRawJsonStrFunc;
private final LongFunction<String> msgPriorityStrFunc;
private final LongFunction<String> msgPropRawJsonStrFunc;
private final LongFunction<String> msgBodyRawJsonStrFunc;
private final LongFunction<String> msgTypeFunc;
Expand All @@ -56,6 +58,7 @@ public MessageProducerOpDispenser(DriverAdapter adapter,
super(adapter, op, tgtNameFunc, s4jSpace);

this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
this.msgPriorityStrFunc = lookupOptionalStrOpValueFunc(MSG_PRIORITY_OP_PARAM);
this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
this.msgTypeFunc = lookupOptionalStrOpValueFunc(MSG_TYPE_OP_PARAM);
Expand Down Expand Up @@ -272,6 +275,7 @@ else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYP
public MessageProducerOp getOp(long cycle) {
String destName = destNameStrFunc.apply(cycle);
String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle);
String jmsMsgPriorityStr = msgPriorityStrFunc.apply(cycle);
String jmsMsgPropertyRawJsonStr = msgPropRawJsonStrFunc.apply(cycle);
String jmsMsgBodyRawJsonStr = msgBodyRawJsonStrFunc.apply(cycle);

Expand All @@ -294,6 +298,9 @@ public MessageProducerOp getOp(long cycle) {
JMSProducer producer;
try {
producer = getJmsProducer(s4JJMSContextWrapper, asyncAPI);
int priority = NumberUtils.toInt(jmsMsgPriorityStr);
assert (priority >= 0 && priority <= 9);
producer.setPriority(priority);
}
catch (JMSException jmsException) {
throw new S4JAdapterUnexpectedException("Unable to create the JMS producer!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public Destination getJmsDestination(
String destType,
String destName) throws JMSRuntimeException
{
String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifer();
String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifier();
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();

S4JSpace.JMSDestinationCacheKey destinationCacheKey =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,30 @@ public static String getValidJmsMsgPropTypeList() {
return StringUtils.join(JMS_MESSAGE_TYPES.LABELS, ", ");
}

// Message compression types
public enum MSG_COMPRESSION_TYPE_STR {
LZ4("LZ4"),
ZSTD("ZSTD"),
ZLIB("ZLIB"),
SNAPPY("SNAPPY");
public final String label;
MSG_COMPRESSION_TYPE_STR(String label) {
this.label = label;
}

private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
private static boolean isValidLabel(String label) {
return LABELS.contains(StringUtils.upperCase(label));
}
}
public static String getValidMsgCompressionTypeList() {
return StringUtils.join(MSG_COMPRESSION_TYPE_STR.LABELS, ", ");
}
public static boolean isValidMsgCompressionTypeStr(String type) {
return MSG_COMPRESSION_TYPE_STR.isValidLabel(type);
}

///////
// Convert JSON string to a key/value map
public static Map<String, String> convertJsonToMap(String jsonStr) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -35,6 +37,8 @@
*/
public class S4JClientConfConverter {

private final static Logger logger = LogManager.getLogger(S4JClientConfConverter.class);

public static Map<String, Object> convertRawClientConf(Map<String, String> pulsarClientConfMapRaw) {
Map<String, Object> s4jClientConfObjMap = new HashMap<>();
s4jClientConfObjMap.putAll(pulsarClientConfMapRaw);
Expand Down Expand Up @@ -71,34 +75,31 @@ public static Map<String, Object> convertRawProducerConf(Map<String, String> pul
/**
* Non-primitive type processing for Pulsar producer configuration items
*/
// "compressionType" has value type "CompressionType"
// - expecting the following values: 'LZ4', 'ZLIB', 'ZSTD', 'SNAPPY'
String confKeyName = "compressionType";
String confVal = pulsarProducerConfMapRaw.get(confKeyName);
String expectedVal = "(LZ4|ZLIB|ZSTD|SNAPPY)";

if (StringUtils.isNotBlank(confVal)) {
if (StringUtils.equalsAnyIgnoreCase(confVal, "LZ4", "ZLIB", "ZSTD", "SNAPPY")) {
CompressionType compressionType = CompressionType.NONE;

switch (StringUtils.upperCase(confVal)) {
case "LZ4":
compressionType = CompressionType.LZ4;
case "ZLIB":
compressionType = CompressionType.ZLIB;
case "ZSTD":
compressionType = CompressionType.ZSTD;
case "SNAPPY":
compressionType = CompressionType.SNAPPY;
}

s4jProducerConfObjMap.put(confKeyName, compressionType);
} else {
throw new S4JAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, "producer", expectedVal));
CompressionType compressionType = CompressionType.NONE;
if ( StringUtils.isNotBlank(confVal) ) {
try {
S4JAdapterUtil.MSG_COMPRESSION_TYPE_STR compressionTypeStr =
S4JAdapterUtil.MSG_COMPRESSION_TYPE_STR.valueOf(confVal);
compressionType = switch (compressionTypeStr) {
case LZ4 -> CompressionType.LZ4;
case ZLIB -> CompressionType.ZLIB;
case ZSTD -> CompressionType.ZSTD;
case SNAPPY -> CompressionType.SNAPPY;
};
} catch (IllegalArgumentException e) {
// Any invalid value will be treated as no compression
logger.warn("Invalid producer \"compressionType\" value ({}) in the config properties file. " +
"Expecting one of the following values: {}. " +
"No message compression will be applied (for producers).",
confVal,
S4JAdapterUtil.getValidMsgCompressionTypeList());
}
}

s4jProducerConfObjMap.put(confKeyName, compressionType);
// TODO: Skip the following Pulsar configuration items for now because they're not really
// needed in the NB S4J testing at the moment. Add support for them when needed.
// * messageRoutingMode
Expand Down Expand Up @@ -312,7 +313,9 @@ public static Map<String, Object> convertRawConsumerConf(Map<String, String> pul
Map.entry("jms.usePulsarAdmin","boolean"),
Map.entry("jms.useServerSideFiltering","boolean"),
Map.entry("jms.waitForServerStartupTimeout","int"),
Map.entry("jms.transactionsStickyPartitions", "boolean")
Map.entry("jms.transactionsStickyPartitions", "boolean"),
Map.entry("jms.enableJMSPriority","boolean"),
Map.entry("jms.priorityMapping","String")
);
public static Map<String, Object> convertRawJmsConf(Map<String, String> s4jJmsConfMapRaw) {
Map<String, Object> s4jJmsConfObjMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@
* under the License.
*/
public class S4JJMSContextWrapper {
private final String jmsContextIdentifer;
private final String jmsContextIdentifier;
private final JMSContext jmsContext;
private final int jmsSessionMode;

public S4JJMSContextWrapper(String identifer, JMSContext jmsContext) {
this.jmsContextIdentifer = identifer;
this.jmsContextIdentifier = identifer;
this.jmsContext = jmsContext;
this.jmsSessionMode = jmsContext.getSessionMode();
}

public int getJmsSessionMode() { return jmsSessionMode; }
public boolean isTransactedMode() { return Session.SESSION_TRANSACTED == this.getJmsSessionMode(); }
public String getJmsContextIdentifer() { return jmsContextIdentifer; }
public String getJmsContextIdentifier() { return jmsContextIdentifier; }
public JMSContext getJmsContext() { return jmsContext; }

public void close() {
Expand All @@ -45,7 +45,7 @@ public void close() {

public String toString() {
return new ToStringBuilder(this).
append("jmsContextIdentifer", jmsContextIdentifer).
append("jmsContextIdentifier", jmsContextIdentifier).
append("jmsContext", jmsContext.toString()).
toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ bindings:
mymap_val1: AlphaNumericString(10)
mymap_val2: AlphaNumericString(20)
mystream_val1: AlphaNumericString(50)
my_priority: WeightedLongs('2:20;4:70;8:10')

# document level parameters that apply to all Pulsar client types:
params:
Expand All @@ -25,6 +26,12 @@ blocks:
"JMSPriority": "9"
}
## (Optional) S4J Message priority emulation (since Pulsar doesn't have native message priority)
# - jms.enableJMSPriority must be set to true in S4J configuration;
# otherwise, the priority value will be ignored.
# - If this is set, the "JMSPriority" value in the header will be ignored.
msg_priority: "{my_priority}"

## (Optional) JMS properties, predefined or customized (in JSON format).
msg_property: |
{
Expand Down

0 comments on commit b34d42c

Please sign in to comment.