Skip to content

Commit

Permalink
JMS Source fixes (#643)
Browse files Browse the repository at this point in the history
Ack the JMS messages was not always possible. Also there was an issue with producing the messages to Kafka out of order from the JMS queue

This change makes sure the
 * Queue messages order are retained when published to Kafka (although they might be routed to different partitions)
 * Ack happens for each message. This is a change from previous behaviour.
 * Records which fail to be committed to Kafka are not ack-ed on JMS side
  • Loading branch information
stheppi authored and andrewstevenson committed Jan 16, 2020
1 parent e1c47d6 commit 79b697a
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import java.util

import com.datamountaineer.streamreactor.connect.config.base.traits._
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.ConfigDef.Importance
import org.apache.kafka.common.config.ConfigDef.Type

object JMSConfig {

Expand Down Expand Up @@ -58,8 +59,8 @@ object JMSConfig {
JMSConfigConstants.BATCH_SIZE_DOC,
"Connection", 13, ConfigDef.Width.MEDIUM, JMSConfigConstants.BATCH_SIZE)
.define(JMSConfigConstants.POLLING_TIMEOUT_CONFIG, Type.LONG, JMSConfigConstants.POLLING_TIMEOUT_DEFAULT, Importance.MEDIUM,
JMSConfigConstants.POLLING_TIMEOUT_DOC,
"Connection", 14, ConfigDef.Width.MEDIUM, JMSConfigConstants.POLLING_TIMEOUT_CONFIG)
JMSConfigConstants.POLLING_TIMEOUT_DOC,
"Connection", 14, ConfigDef.Width.MEDIUM, JMSConfigConstants.POLLING_TIMEOUT_CONFIG)

//converters

Expand All @@ -69,23 +70,34 @@ object JMSConfig {
Importance.HIGH, JMSConfigConstants.THROW_ON_CONVERT_ERRORS_DOC, "Converter", 2, ConfigDef.Width.MEDIUM,
JMSConfigConstants.THROW_ON_CONVERT_ERRORS_DISPLAY)
.define(JMSConfigConstants.AVRO_CONVERTERS_SCHEMA_FILES, Type.STRING, JMSConfigConstants.AVRO_CONVERTERS_SCHEMA_FILES_DEFAULT,
Importance.HIGH, JMSConfigConstants.AVRO_CONVERTERS_SCHEMA_FILES_DOC, "Converter", 3, ConfigDef.Width.MEDIUM,
JMSConfigConstants.AVRO_CONVERTERS_SCHEMA_FILES)
Importance.HIGH, JMSConfigConstants.AVRO_CONVERTERS_SCHEMA_FILES_DOC, "Converter", 3, ConfigDef.Width.MEDIUM,
JMSConfigConstants.AVRO_CONVERTERS_SCHEMA_FILES)
.define(JMSConfigConstants.HEADERS_CONFIG,
Type.STRING, "", Importance.LOW, JMSConfigConstants.HEADERS_CONFIG_DOC,
"Converter", 4, ConfigDef.Width.MEDIUM, JMSConfigConstants.HEADERS_CONFIG_DISPLAY)

.define(JMSConfigConstants.PROGRESS_COUNTER_ENABLED, Type.BOOLEAN, JMSConfigConstants.PROGRESS_COUNTER_ENABLED_DEFAULT,
Importance.MEDIUM, JMSConfigConstants.PROGRESS_COUNTER_ENABLED_DOC,
"Metrics", 1, ConfigDef.Width.MEDIUM, JMSConfigConstants.PROGRESS_COUNTER_ENABLED_DISPLAY)
Importance.MEDIUM, JMSConfigConstants.PROGRESS_COUNTER_ENABLED_DOC,
"Metrics", 1, ConfigDef.Width.MEDIUM, JMSConfigConstants.PROGRESS_COUNTER_ENABLED_DISPLAY)

.define(JMSConfigConstants.EVICT_UNCOMMITTED_MINUTES,
Type.INT,
JMSConfigConstants.EVICT_UNCOMMITTED_MINUTES_DEFAULT,
Importance.MEDIUM, JMSConfigConstants.EVICT_UNCOMMITTED_MINUTES_DOC,
"Settings", 1, ConfigDef.Width.MEDIUM, JMSConfigConstants.EVICT_UNCOMMITTED_MINUTES_DOC)

.define(JMSConfigConstants.EVICT_THRESHOLD_MINUTES,
Type.INT,
JMSConfigConstants.EVICT_THRESHOLD_MINUTES_DEFAULT,
Importance.MEDIUM, JMSConfigConstants.EVICT_THRESHOLD_MINUTES_DOC,
"Settings", 2, ConfigDef.Width.MEDIUM, JMSConfigConstants.EVICT_THRESHOLD_MINUTES_DOC)
}

/**
* <h1>JMSSinkConfig</h1>
*
* Holds config, extends AbstractConfig.
**/
* <h1>JMSSinkConfig</h1>
*
* Holds config, extends AbstractConfig.
**/
case class JMSConfig(props: util.Map[String, String])
extends BaseConfig(JMSConfigConstants.CONNECTOR_PREFIX, JMSConfig.config, props)
with KcqlSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.datamountaineer.streamreactor.connect.jms.config
import com.datamountaineer.streamreactor.connect.config.base.const.TraitConfigConst._

object JMSConfigConstants {

val CONNECTOR_PREFIX = "connect.jms"

val JMS_URL = s"${CONNECTOR_PREFIX}.${URL_SUFFIX}"
Expand All @@ -39,9 +39,8 @@ object JMSConfigConstants {
"org.apache.activemq.ActiveMQConnectionFactory"
val CONNECTION_FACTORY_DEFAULT = "ConnectionFactory"


val KCQL = s"${CONNECTOR_PREFIX}.${KCQL_PROP_SUFFIX}"
val KCQL_DOC = "KCQL expression describing field selection and routes."
val KCQL_DOC = "KCQL expression describing field selection and routes."

val ERROR_POLICY = s"${CONNECTOR_PREFIX}.${ERROR_POLICY_PROP_SUFFIX}"
val ERROR_POLICY_DOC: String =
Expand Down Expand Up @@ -73,15 +72,13 @@ object JMSConfigConstants {
|i.e. com.datamountaineer.streamreactor.connect.source.converters.AvroConverter""".stripMargin
private[config] val DEFAULT_CONVERTER_DISPLAY = "Default Converter class"


val HEADERS_CONFIG = s"${CONNECTOR_PREFIX}.headers"
private[config] val HEADERS_CONFIG_DOC =
s"""
|Contains collection of static JMS headers included in every SinkRecord
|The format is ${CONNECTOR_PREFIX}.headers="$$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"""".stripMargin
|Contains collection of static JMS headers included in every SinkRecord
|The format is ${CONNECTOR_PREFIX}.headers="$$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage"""".stripMargin
private[config] val HEADERS_CONFIG_DISPLAY = "JMS static headers"


val THROW_ON_CONVERT_ERRORS_CONFIG = s"${CONNECTOR_PREFIX}.converter.throw.on.error"
private[config] val THROW_ON_CONVERT_ERRORS_DOC = "If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false."
private[config] val THROW_ON_CONVERT_ERRORS_DISPLAY = "Throw error on conversion"
Expand Down Expand Up @@ -112,4 +109,12 @@ object JMSConfigConstants {
val POLLING_TIMEOUT_DOC = "Provides the timeout to poll incoming messages"
val POLLING_TIMEOUT_DISPLAY = "Polling timeout"
val POLLING_TIMEOUT_DEFAULT = 1000L

val EVICT_UNCOMMITTED_MINUTES = s"$CONNECTOR_PREFIX.evict.interval.minutes"
private[config] val EVICT_UNCOMMITTED_MINUTES_DOC = "Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged."
private[config] val EVICT_UNCOMMITTED_MINUTES_DEFAULT = 10

val EVICT_THRESHOLD_MINUTES = s"$CONNECTOR_PREFIX.evict.threshold.minutes"
private[config] val EVICT_THRESHOLD_MINUTES_DOC = "The number of minutes after which an uncommitted entry becomes evictable from the connector cache."
private[config] val EVICT_THRESHOLD_MINUTES_DEFAULT = 10
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@

package com.datamountaineer.streamreactor.connect.jms.config

import com.datamountaineer.kcql.{FormatType, Kcql}
import com.datamountaineer.kcql.FormatType
import com.datamountaineer.kcql.Kcql
import com.datamountaineer.streamreactor.connect.converters.source.Converter
import com.datamountaineer.streamreactor.connect.errors.{ErrorPolicy, ThrowErrorPolicy}
import com.datamountaineer.streamreactor.connect.errors.ErrorPolicy
import com.datamountaineer.streamreactor.connect.errors.ThrowErrorPolicy
import com.datamountaineer.streamreactor.connect.jms.config.DestinationSelector.DestinationSelector
import com.google.common.base.Splitter
import com.typesafe.scalalogging.slf4j.StrictLogging
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.config.types.Password

import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}
import scala.util.Failure
import scala.util.Success
import scala.util.Try

case class JMSSetting(source: String,
target: String,
Expand All @@ -50,20 +54,21 @@ case class JMSSettings(connectionURL: String,
batchSize: Int,
errorPolicy: ErrorPolicy = new ThrowErrorPolicy,
retries: Int,
pollingTimeout: Long) {
pollingTimeout: Long,
evictInterval: Int,
evictThreshold: Int) {
require(connectionURL != null && connectionURL.trim.length > 0, "Invalid connection URL")
require(connectionFactoryClass != null, "Invalid class for connection factory")
}


object JMSSettings extends StrictLogging {

/**
* Creates an instance of JMSSettings from a JMSSinkConfig
*
* @param config : The map of all provided configurations
* @return An instance of JmsSettings
*/
* Creates an instance of JMSSettings from a JMSSinkConfig
*
* @param config : The map of all provided configurations
* @return An instance of JmsSettings
*/
def apply(config: JMSConfig, sink: Boolean): JMSSettings = {

val kcql = config.getKCQL
Expand Down Expand Up @@ -162,6 +167,9 @@ object JMSSettings extends StrictLogging {
headersForJmsDest)
}).toList

val evictInterval = config.getInt(JMSConfigConstants.EVICT_UNCOMMITTED_MINUTES)
val evictThreshold = config.getInt(JMSConfigConstants.EVICT_THRESHOLD_MINUTES)

new JMSSettings(
url,
initialContextFactoryClass,
Expand All @@ -175,7 +183,9 @@ object JMSSettings extends StrictLogging {
batchSize,
errorPolicy,
nbrOfRetries,
pollingTimeout)
pollingTimeout,
evictInterval,
evictThreshold)
}

private def parseAdditionalHeaders(cfgLine: String): Map[String, String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,42 @@ package com.datamountaineer.streamreactor.connect.jms.source
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import javax.jms.Message
import java.util.Collections
import java.util.function.BiConsumer

import com.datamountaineer.streamreactor.connect.jms.config.{JMSConfig, JMSConfigConstants, JMSSettings}
import com.datamountaineer.streamreactor.connect.jms.config.JMSConfig
import com.datamountaineer.streamreactor.connect.jms.config.JMSConfigConstants
import com.datamountaineer.streamreactor.connect.jms.config.JMSSettings
import com.datamountaineer.streamreactor.connect.jms.source.readers.JMSReader
import com.datamountaineer.streamreactor.connect.utils.{JarManifest, ProgressCounter}
import com.datamountaineer.streamreactor.connect.utils.JarManifest
import com.datamountaineer.streamreactor.connect.utils.ProgressCounter
import com.typesafe.scalalogging.slf4j.StrictLogging
import org.apache.kafka.connect.source.{SourceRecord, SourceTask}
import javax.jms.Message
import org.apache.kafka.connect.source.SourceRecord
import org.apache.kafka.connect.source.SourceTask

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import scala.util.Failure
import scala.util.Success
import scala.util.Try

/**
* Created by andrew@datamountaineer.com on 10/03/2017.
* stream-reactor
*/
* Created by andrew@datamountaineer.com on 10/03/2017.
* stream-reactor
*/
class JMSSourceTask extends SourceTask with StrictLogging {
var reader: JMSReader = _
val progressCounter = new ProgressCounter
private var reader: JMSReader = _
private val progressCounter = new ProgressCounter
private var enableProgress: Boolean = false
private val pollingTimeout: AtomicLong = new AtomicLong(0L)
private var ackMessage: Option[Message] = None
private val recordsToCommit = new ConcurrentHashMap[SourceRecord, SourceRecord]()
private val recordsToCommit = new ConcurrentHashMap[SourceRecord, MessageAndTimestamp]()
private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)

private val EmptyRecords = Collections.emptyList[SourceRecord]()
private var lastEvictedTimestamp: FiniteDuration = FiniteDuration(System.currentTimeMillis(), MILLISECONDS)
private var evictInterval: Int = 0
private var evictThreshold: Int = 0
override def start(props: util.Map[String, String]): Unit = {
logger.info(scala.io.Source.fromInputStream(getClass.getResourceAsStream("/jms-source-ascii.txt")).mkString + s" $version")
logger.info(manifest.printManifest())
Expand All @@ -57,6 +67,8 @@ class JMSSourceTask extends SourceTask with StrictLogging {
reader = JMSReader(settings)
enableProgress = config.getBoolean(JMSConfigConstants.PROGRESS_COUNTER_ENABLED)
pollingTimeout.set(settings.pollingTimeout)
evictInterval = settings.evictInterval
evictThreshold = settings.evictThreshold
}

override def stop(): Unit = {
Expand All @@ -73,43 +85,52 @@ class JMSSourceTask extends SourceTask with StrictLogging {
}

override def poll(): util.List[SourceRecord] = {
var records: mutable.Seq[SourceRecord] = mutable.Seq.empty[SourceRecord]
var messages: mutable.Seq[Message] = mutable.Seq.empty[Message]

try {
val polled = reader.poll()

if(polled.isEmpty) {
synchronized {
this.wait(pollingTimeout.get())
}
} else {
records = collection.mutable.Seq(polled.map({ case (_, record) => record }).toSeq: _*)
messages = collection.mutable.Seq(polled.map({ case (message, _) => message }).toSeq: _*)
val polled = reader.poll()
if (polled.isEmpty) {
synchronized {
this.wait(pollingTimeout.get())
}
if (enableProgress) {
progressCounter.update(Vector.empty)
}
EmptyRecords
} else {
val timestamp = System.currentTimeMillis()
val records = polled.map { case (msg, record) =>
recordsToCommit.put(record, MessageAndTimestamp(msg, FiniteDuration(timestamp, MILLISECONDS)))
record
}
} finally {
if (messages.size > 0) {
ackMessage = messages.headOption
val polledRecordsToCommit = records.zip(records).toMap.asJava
recordsToCommit.putAll(polledRecordsToCommit)
if (enableProgress) {
progressCounter.update(records)
}
records.asJava
}
}

if (enableProgress) {
progressCounter.update(records.toVector)
private def evictUncommittedMessages(): Unit = {
val current = FiniteDuration(System.currentTimeMillis(), MILLISECONDS)
if ((current - lastEvictedTimestamp).toMinutes > evictInterval) {
recordsToCommit.forEach(new BiConsumer[SourceRecord, MessageAndTimestamp] {
override def accept(t: SourceRecord, u: MessageAndTimestamp): Unit = evictIfApplicable(t, u, current)
})
}
lastEvictedTimestamp = current
}

records
private def evictIfApplicable(record: SourceRecord, msg: MessageAndTimestamp, now: FiniteDuration): Unit = {
if ((now - msg.timestamp).toMinutes > evictThreshold) {
recordsToCommit.remove(record)
}
}

override def commitRecord(record: SourceRecord): Unit = {
recordsToCommit.remove(record)

if (recordsToCommit.isEmpty) {
ackMessage.foreach(_.acknowledge())
ackMessage = None
Option(recordsToCommit.remove(record)).foreach { case MessageAndTimestamp(msg, _) =>
Try(msg.acknowledge())
}
evictUncommittedMessages()
}

override def version: String = manifest.version()
}
}

case class MessageAndTimestamp(msg: Message, timestamp: FiniteDuration)
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,41 @@

package com.datamountaineer.streamreactor.connect.jms.source.readers

import javax.jms.{Message, MessageConsumer}

import com.datamountaineer.streamreactor.connect.converters.source.Converter
import com.datamountaineer.streamreactor.connect.jms.JMSSessionProvider
import com.datamountaineer.streamreactor.connect.jms.config.JMSSettings
import com.datamountaineer.streamreactor.connect.jms.source.domain.JMSStructMessage
import com.typesafe.scalalogging.slf4j.StrictLogging
import javax.jms.Message
import javax.jms.MessageConsumer
import org.apache.kafka.connect.source.SourceRecord

import scala.util.Try

/**
* Created by andrew@datamountaineer.com on 10/03/2017.
* stream-reactor
*/
* Created by andrew@datamountaineer.com on 10/03/2017.
* stream-reactor
*/
class JMSReader(settings: JMSSettings) extends StrictLogging {

val provider = JMSSessionProvider(settings)
provider.start()
val consumers: Map[String, MessageConsumer] = provider.queueConsumers ++ provider.topicsConsumers
val consumers: Vector[(String, MessageConsumer)] = (provider.queueConsumers ++ provider.topicsConsumers).toVector
val convertersMap: Map[String, Option[Converter]] = settings.settings.map(s => (s.source, s.sourceConverters)).toMap
val topicsMap: Map[String, String] = settings.settings.map(s => (s.source, s.target)).toMap

def poll(): Map[Message, SourceRecord] = {

def poll(): Vector[(Message, SourceRecord)] = {
val messages = consumers
.flatMap({ case (source, consumer)=>
(0 to settings.batchSize)
.flatMap(_ => Option(consumer.receiveNoWait()))
.map(m => (m, convert(source, topicsMap(source), m)))
})
.flatMap({ case (source, consumer) =>
(0 to settings.batchSize)
.flatMap(_ => Option(consumer.receiveNoWait()))
.map(m => (m, convert(source, topicsMap(source), m)))
})

messages
}

def convert(source: String, target: String, message: Message): SourceRecord = {
def convert(source: String, target: String, message: Message): SourceRecord = {
convertersMap(source).getOrElse(None) match {
case c: Converter => c.convert(target, source, message.getJMSMessageID, JMSStructMessage.getPayload(message))
case None => JMSStructMessage.getStruct(target, message)
Expand Down
Loading

0 comments on commit 79b697a

Please sign in to comment.