Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/influxdb2 #879

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ lazy val subProjects: Seq[ProjectMatrix] = Seq(
hbase,
hive,
influxdb,
influxdb2,
jms,
kudu,
mongodb,
Expand Down Expand Up @@ -193,6 +194,24 @@ lazy val influxdb = (projectMatrix in file("kafka-connect-influxdb"))
.configureAssembly()
.configureTests(baseTestDeps)

lazy val influxdb2 = (projectMatrix in file("kafka-connect-influxdb2"))
.dependsOn(common)
.settings(
settings ++
Seq(
name := "kafka-connect-influxdb2",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectInfluxDb2Deps,
publish / skip := true,
dependencyOverrides ++= nettyOverrides,
),
)
.kafka2Row()
.kafka3Row()
.configureAssembly()
.configureTests(baseTestDeps)


lazy val jms = (projectMatrix in file("kafka-connect-jms"))
.dependsOn(common)
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ object TraitConfigConst {
val KEYSTORE_PASS_SUFFIX = "keystore.pass"
val KEYSTORE_PATH_SUFFIX = "keystore.path"
val CERTIFICATES_SUFFIX = "certs"
val ORG_SUFFIX = "organization"
val BUCKET_PROP_SUFFIX = "bucket"
val TOKEN_SUFFIX = "suffix"
val CERTIFICATE_KEY_CHAIN_SUFFIX = "cert.chain.key"
val CERT_KEY = "cert.key"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,35 @@
package com.datamountaineer.streamreactor.connect.influx.converters

import com.datamountaineer.streamreactor.connect.influx.NanoClock
import com.datamountaineer.streamreactor.connect.influx.converters.SinkRecordParser.ParsedKeyValueSinkRecord
import com.datamountaineer.streamreactor.connect.influx.converters.SinkRecordParser.ParsedSinkRecord
import com.datamountaineer.streamreactor.connect.influx.converters.SinkRecordParser.{ParsedKeyValueSinkRecord, ParsedSinkRecord}
import com.datamountaineer.streamreactor.connect.influx.helpers.Util
import com.datamountaineer.streamreactor.connect.influx.writers.KcqlDetails
import com.datamountaineer.streamreactor.connect.influx.writers.KcqlDetails.ConstantTag
import com.datamountaineer.streamreactor.connect.influx.writers.KcqlDetails.DynamicTag
import com.datamountaineer.streamreactor.connect.influx.writers.KcqlDetails.Path
import com.datamountaineer.streamreactor.connect.influx.writers.KcqlDetails.{ConstantTag, DynamicTag, Path}
import org.influxdb.dto.Point

import java.time.Instant
import java.util.Date
import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters.ListHasAsScala
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.{Failure, Success, Try}

object InfluxPoint {

def build(nanoClock: NanoClock)(record: ParsedKeyValueSinkRecord, details: KcqlDetails): Try[Point] =
def build(nanoClock: NanoClock)(record: ParsedKeyValueSinkRecord,
details: KcqlDetails): Try[Point] =
for {
(timeUnit, timestamp) <- extractTimeMeasures(nanoClock, record, details)
measurement = details.dynamicTarget
.flatMap(record.field)
.map(_.toString)
.getOrElse(details.target)
pointBuilder = Point.measurement(measurement).time(timestamp, timeUnit)
point <- addValuesAndTags(pointBuilder, record, details)
point <- addValuesAndTags(pointBuilder, record, details)
} yield point.build()

private def addValuesAndTags(
pointBuilder: Point.Builder,
record: ParsedKeyValueSinkRecord,
details: KcqlDetails,
): Try[Point.Builder] =
private def addValuesAndTags(pointBuilder: Point.Builder,
record: ParsedKeyValueSinkRecord,
details: KcqlDetails): Try[Point.Builder] =
details.NonIgnoredFields
.flatMap {
case (_, path, _) if path.equals(Path(Util.KEY_All_ELEMENTS)) =>
Expand All @@ -55,32 +49,25 @@ object InfluxPoint {
case (key, _) =>
Failure(
new IllegalArgumentException(
s"Property $key is referenced but no value could be found .",
),
)
},
),
)
s"Property $key is referenced but no value could be found ."))
}))
.flatMap(builder =>
details.tags
.map {
case ConstantTag(key, value) => (key.value, Some(value))
case DynamicTag(name, path) =>
(name.value, record.field(path).map(_.toString))
}
.foldLeft(Try(builder)) { (builder, tag) =>
.foldLeft(Try(builder))((builder, tag) => {
tag match {
case (key, Some(value)) => builder.map(_.tag(key, value))
case (_, None) => builder
}
},
)
}))

private def extractTimeMeasures(
nanoClock: NanoClock,
record: ParsedSinkRecord,
details: KcqlDetails,
): Try[(TimeUnit, Long)] =
private def extractTimeMeasures(nanoClock: NanoClock,
record: ParsedSinkRecord,
details: KcqlDetails): Try[(TimeUnit, Long)] =
details.timestampField
.flatMap { path =>
record
Expand All @@ -90,37 +77,32 @@ object InfluxPoint {
.getOrElse(Try(TimeUnit.NANOSECONDS -> nanoClock.getEpochNanos))

private[influx] def coerceTimeStamp(
value: Any,
fieldPath: Iterable[String],
): Try[Long] =
value: Any,
fieldPath: Iterable[String]): Try[Long] = {
value match {
case b: Byte => Try(b.toLong)
case s: Short => Try(s.toLong)
case i: Int => Try(i.toLong)
case l: Long => Try(l)
case l: Long => Try(l)
case s: String =>
Try(Instant.parse(s).toEpochMilli).transform(
Try(_),
e =>
Failure(
new IllegalArgumentException(
s"$s is not a valid format for timestamp, expected 'yyyy-MM-DDTHH:mm:ss.SSSZ'",
e,
),
),
)
e)))
case d: Date => Try(d.toInstant.toEpochMilli)
/* Assume Unix timestamps in seconds with double precision, coerce to Long with milliseconds precision */
case d: Double => Try((d * 1e3).toLong)
/* Assume Unix timestamps in seconds with double precision, coerce to Long with microseconds precision */
case d: Double => Try((d * 1E6).toLong)
case other =>
Failure(
new IllegalArgumentException(
s"Invalid value for field:${fieldPath.mkString(".")}.Value '$other' is not a valid field for the timestamp",
),
)
Failure(new IllegalArgumentException(
s"Invalid value for field:${fieldPath.mkString(".")}.Value '$other' is not a valid field for the timestamp"))
}
}

def writeField(builder: Point.Builder)(field: String, v: Any): Try[Point.Builder] = v match {
def writeField(builder: Point.Builder)(field: String,
v: Any): Try[Point.Builder] = v match {
case value: Long => Try(builder.addField(field, value))
case value: Int => Try(builder.addField(field, value))
case value: BigInt => Try(builder.addField(field, value))
Expand All @@ -135,19 +117,15 @@ object InfluxPoint {
case value: java.util.Date => Try(builder.addField(field, value.getTime))
case value: java.util.List[_] => flattenArray(builder)(field, value)
case value =>
Failure(
new RuntimeException(
s"Can't select field:'$field' because it leads to value:'$value' (${Option(
value,
).map(_.getClass.getName).getOrElse("")})is not a valid type for InfluxDb.",
),
)
Failure(new RuntimeException(
s"Can't select field:'$field' because it leads to value:'$value' (${Option(
value).map(_.getClass.getName).getOrElse("")})is not a valid type for InfluxDb."))
}

/**
* Flatten an array writing each element as a new field with the following convention:
* "name": ["a", "b", "c"] => name0 = "a", name1 = "b", name3 = "c"
*/
* Flatten an array writing each element as a new field with the following convention:
* "name": ["a", "b", "c"] => name0 = "a", name1 = "b", name3 = "c"
*/
private def flattenArray(builder: Point.Builder)(field: String, value: java.util.List[_]) = {
val res = value.asScala.zipWithIndex.map {
case (el, i) => writeField(builder)(field + i, el)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,13 @@ class ValuesExtractorMapTest extends AnyWordSpec with Matchers {
result.failed.get shouldBe a[IllegalArgumentException]
}

"assume unix timestamp in seconds if type is double and coerce to Long in milliseconds" in {
"assume unix timestamp in seconds if type is double and coerce to Long in microseconds" in {

val payload = new java.util.HashMap[String, Any]()

payload.put("double", 1.56937031387E9)
payload.put("double", 1651794924.081999)

InfluxPoint.coerceTimeStamp(ValuesExtractor.extract(payload, Vector("double")),Vector("double")) shouldBe Success(1569370313870L)
InfluxPoint.coerceTimeStamp(ValuesExtractor.extract(payload, Vector("double")),Vector("double")) shouldBe Success(1651794924081999L)

}

Expand Down
14 changes: 14 additions & 0 deletions kafka-connect-influxdb2/src/main/resources/influx-ascii.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

__ __
/ / ____ _____ ____/ /___ ____ ____
/ / / __ `/ __ \/ __ / __ \/ __ \/ __ \
/ /___/ /_/ / / / / /_/ / /_/ / /_/ / /_/ /
/_____/\__,_/_/ /_/\__,_/\____/\____/ .___/
/_/
____ ______ ____ ____ _____ _ __
/ _/___ / __/ /_ ___ __/ __ \/ __ ) / ___/(_)___ / /__
/ // __ \/ /_/ / / / / |/_/ / / / __ | \__ \/ / __ \/ //_/
_/ // / / / __/ / /_/ /> </ /_/ / /_/ / ___/ / / / / / ,<
/___/_/ /_/_/ /_/\__,_/_/|_/_____/_____/ /____/_/_/ /_/_/|_|

By Stefan Bocutiu
28 changes: 28 additions & 0 deletions kafka-connect-influxdb2/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>

<!-- -->
<!-- Copyright 2021 Celonis. -->
<!-- -->
<!-- Licensed under the Apache License, Version 2.0 (the "License"); -->
<!-- you may not use this file except in compliance with the License. -->
<!-- You may obtain a copy of the License at -->
<!-- -->
<!-- http://www.apache.org/licenses/LICENSE-2.0 -->
<!-- -->
<!-- Unless required by applicable law or agreed to in writing, software -->
<!-- distributed under the License is distributed on an "AS IS" BASIS, -->
<!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
<!-- See the License for the specific language governing permissions and -->
<!-- limitations under the License. -->
<!-- -->
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %-5p [%t] [%c] [%M:%L] %m%n</pattern>
</encoder>
</appender>
<logger name="com.jcabi.manifests" level="ERROR"/>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2017 Datamountaineer.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.datamountaineer.streamreactor.connect.influx2

import com.datamountaineer.streamreactor.common.config.Helpers
import com.datamountaineer.streamreactor.common.utils.JarManifest

import java.util
import com.datamountaineer.streamreactor.connect.influx2.config.InfluxConfig
import com.datamountaineer.streamreactor.connect.influx2.config.InfluxConfigConstants
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.sink.SinkConnector

import scala.jdk.CollectionConverters.MapHasAsScala
import scala.jdk.CollectionConverters.SeqHasAsJava

/**
* <h1>InfluxSinkConnector</h1>
* Kafka connect InfluxDb Sink connector
*
* Sets up InfluxSinkTask and configurations for the tasks.
*/
class InfluxSinkConnector extends SinkConnector with StrictLogging {
private var configProps: Option[util.Map[String, String]] = None
private val configDef = InfluxConfig.config
private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)

/**
* States which SinkTask class to use
*/
override def taskClass(): Class[_ <: Task] = classOf[InfluxSinkTask]

/**
* Set the configuration for each work and determine the split
*
* @param maxTasks The max number of task workers be can spawn
* @return a List of configuration properties per worker
*/
override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = {
logger.info(s"Setting task configurations for $maxTasks workers.")
(1 to maxTasks).map(_ => configProps.get).toList.asJava
}

/**
* Start the sink and set to configuration
*
* @param props A map of properties for the connector and worker
*/
override def start(props: util.Map[String, String]): Unit = {
logger.info(s"Starting InfluxDb sink task with ${props.toString}.")
Helpers.checkInputTopics(InfluxConfigConstants.KCQL_CONFIG, props.asScala.toMap)
configProps = Some(props)
}

override def stop(): Unit = {}

override def version(): String = manifest.version()

override def config(): ConfigDef = configDef
}
Loading