diff --git a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfig.scala b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfig.scala index 2668a61b0..661803b78 100644 --- a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfig.scala +++ b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfig.scala @@ -384,6 +384,17 @@ object CassandraConfigSource { ConfigDef.Width.MEDIUM, CassandraConfigConstants.INITIAL_OFFSET, ) + .define( + CassandraConfigConstants.DATE_FORMAT_STRING, + Type.STRING, + CassandraConfigConstants.DATE_FORMAT_STRING_DEFAULT, + Importance.LOW, + CassandraConfigConstants.DATE_FORMAT_STRING_DOC, + "Import", + 10, + ConfigDef.Width.MEDIUM, + CassandraConfigConstants.DATE_FORMAT_STRING, + ) .define( CassandraConfigConstants.MAPPING_COLLECTION_TO_JSON, Type.BOOLEAN, diff --git a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfigConstants.scala b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfigConstants.scala index 5ba1ed81b..497705875 100644 --- a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfigConstants.scala +++ b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfigConstants.scala @@ -203,6 +203,11 @@ object CassandraConfigConstants { val INITIAL_OFFSET_DOC = "The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS'Z'). Default 1900-01-01 00:00:00.0000000Z" + val DATE_FORMAT_STRING = s"$CONNECTOR_PREFIX.date.format.string" + val DATE_FORMAT_STRING_DEFAULT = "yyyy-MM-dd HH:mm:ss.SSS'Z'" + val DATE_FORMAT_STRING_DOC = + "The format string to use for formatting the initial offset and other " + val MAPPING_COLLECTION_TO_JSON = s"$CONNECTOR_PREFIX.mapping.collection.to.json" val MAPPING_COLLECTION_TO_JSON_DOC = "Mapping columns with type Map, List and Set like json" val MAPPING_COLLECTION_TO_JSON_DEFAULT = true diff --git a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraSettings.scala b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraSettings.scala index a23da41ee..3a31ab0a4 100644 --- a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraSettings.scala +++ b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraSettings.scala @@ -66,6 +66,7 @@ case class CassandraSourceSetting( timeSliceDuration: Long = CassandraConfigConstants.TIMESLICE_DURATION_DEFAULT, timeSliceDelay: Long = CassandraConfigConstants.TIMESLICE_DELAY_DEFAULT, initialOffset: String = CassandraConfigConstants.INITIAL_OFFSET_DEFAULT, + dateFormatString: String = CassandraConfigConstants.DATE_FORMAT_STRING_DEFAULT, timeSliceMillis: Long = CassandraConfigConstants.TIME_SLICE_MILLIS_DEFAULT, mappingCollectionToJson: Boolean = CassandraConfigConstants.MAPPING_COLLECTION_TO_JSON_DEFAULT, connectTimeout: Int = CassandraConfigConstants.DEFAULT_CONNECT_TIMEOUT, @@ -116,6 +117,7 @@ object CassandraSettings extends StrictLogging { val timeSliceDuration = config.getLong(CassandraConfigConstants.TIMESLICE_DURATION) val timeSliceDelay = config.getLong(CassandraConfigConstants.TIMESLICE_DELAY) val initialOffset = config.getString(CassandraConfigConstants.INITIAL_OFFSET) + val dateFormatString = config.getString(CassandraConfigConstants.DATE_FORMAT_STRING) val timeSliceMillis = config.getLong(CassandraConfigConstants.TIME_SLICE_MILLIS) val mappingCollectionToJson = config.getBoolean(CassandraConfigConstants.MAPPING_COLLECTION_TO_JSON) @@ -159,6 +161,7 @@ object CassandraSettings extends StrictLogging { fetchSize = fetchSize, timeSliceDuration = timeSliceDuration, timeSliceDelay = timeSliceDelay, + dateFormatString = dateFormatString, initialOffset = initialOffset, timeSliceMillis = timeSliceMillis, mappingCollectionToJson = mappingCollectionToJson, diff --git a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/source/CassandraTableReader.scala b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/source/CassandraTableReader.scala index 82aff9bca..c96b9cc11 100644 --- a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/source/CassandraTableReader.scala +++ b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/source/CassandraTableReader.scala @@ -64,15 +64,14 @@ class CassandraTableReader( private val cqlGenerator = new CqlGenerator(setting) class CassandraDateFormatter { - private val dateFormatPattern = "yyyy-MM-dd HH:mm:ss.SSS'Z'" def parse(date: String): Date = { - val dateFormatter = new SimpleDateFormat(dateFormatPattern) + val dateFormatter = new SimpleDateFormat(dateFormatString) dateFormatter.parse(date) } def format(date: Date): String = { - val dateFormatter = new SimpleDateFormat(dateFormatPattern) + val dateFormatter = new SimpleDateFormat(dateFormatString) dateFormatter.format(date) } @@ -82,6 +81,7 @@ class CassandraTableReader( } } + private val dateFormatString = setting.dateFormatString private val dateFormatter = new CassandraDateFormatter() private val primaryKeyCol = setting.primaryKeyColumn.getOrElse("") private val querying = new AtomicBoolean(false)