Skip to content

Commit

Permalink
Compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Oct 24, 2022
1 parent 05b5cbc commit 57250e9
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object TraitConfigConst {
val CERTIFICATES_SUFFIX = "certs"
val ORG_SUFFIX = "organization"
val BUCKET_PROP_SUFFIX = "bucket"
val TOKEN_SUFFIX = "suffix"
val TOKEN_SUFFIX = "token"
val CERTIFICATE_KEY_CHAIN_SUFFIX = "cert.chain.key"
val CERT_KEY = "cert.key"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ trait ConsistencyLevelSettings[T <: Enum[T]] extends BaseSettings {
val consistencyLevel = getString(consistencyLevelConstant) match {
case "" => None
case other =>
Try(Enum.valueOf[T](enum, other)) match {
Try(Enum.valueOf[T](enum, other))
.orElse(
Try(Enum.valueOf[T](enum, other.toLowerCase)),
)
.orElse(
Try(Enum.valueOf[T](enum, other.toUpperCase)),
) match {
case Failure(_) =>
throw new ConfigException(s"'$other' is not a valid $consistencyLevelConstant. " +
s"Available values are:${enum.getEnumConstants.map(_.toString).mkString(",")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,13 @@

package com.datamountaineer.streamreactor.connect.influx2.config

import com.datamountaineer.streamreactor.common.config.base.traits.BaseConfig
import com.datamountaineer.streamreactor.common.config.base.traits.ConsistencyLevelSettings
import com.datamountaineer.streamreactor.common.config.base.traits.DatabaseSettings
import com.datamountaineer.streamreactor.common.config.base.traits.ErrorPolicySettings
import com.datamountaineer.streamreactor.common.config.base.traits.KcqlSettings
import com.datamountaineer.streamreactor.common.config.base.traits.NumberRetriesSettings
import com.datamountaineer.streamreactor.common.config.base.traits.UserSettings
import com.datamountaineer.streamreactor.common.config.base.traits._
import com.influxdb.client.domain.WriteConsistency
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}

import java.util

import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef.Importance
import org.apache.kafka.common.config.ConfigDef.Type
import com.influxdb.client.domain.WriteConsistency

object InfluxConfig {

val config: ConfigDef = new ConfigDef()
Expand Down Expand Up @@ -154,4 +146,3 @@ case class InfluxConfig(props: util.Map[String, String])
with NumberRetriesSettings
with DatabaseSettings
with ConsistencyLevelSettings[WriteConsistency]
with UserSettings
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ object InfluxSettings {
def apply(config: InfluxConfig): InfluxSettings = {
val url = config.getString(InfluxConfigConstants.INFLUX_URL_CONFIG)

if (url == null || url.trim.length == 0) {
if (url == null || url.trim.isEmpty) {
throw new ConfigException(s"${InfluxConfigConstants.INFLUX_URL_CONFIG} is not set correctly")
}

val org = config.getString(InfluxConfigConstants.INFLUX_CONNECTION_ORG_CONFIG)

if (org == null || org.trim.length == 0) {
if (org == null || org.trim.isEmpty) {
throw new ConfigException(s"${InfluxConfigConstants.INFLUX_CONNECTION_ORG_CONFIG} is not set correctly")
}

val tokenRaw = config.getSecret
val tokenRaw = config.getPassword(InfluxConfigConstants.INFLUX_CONNECTION_TOKEN_CONFIG)

val token = tokenRaw.value() match {
case "" => null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object InfluxPoint {
}
.getOrElse(Try(TimeUnit.NANOSECONDS -> nanoClock.getEpochNanos))

private[influx2] def coerceTimeStamp(
def coerceTimeStamp(
value: Any,
fieldPath: Iterable[String]): Try[Long] = {
value match {
Expand Down Expand Up @@ -117,12 +117,12 @@ object InfluxPoint {
def writeField(builder: Point)(field: String,
v: Any): Try[Point] = v match {
case value: Long => Try(builder.addField(field, value))
case value: Int => Try(builder.addField(field, value))
case value: Int => Try(builder.addField(field, value.toLong))
case value: BigInt => Try(builder.addField(field, value))
case value: Byte => Try(builder.addField(field, value.toShort))
case value: Short => Try(builder.addField(field, value))
case value: Byte => Try(builder.addField(field, value.toLong))
case value: Short => Try(builder.addField(field, value.toLong))
case value: Double => Try(builder.addField(field, value))
case value: Float => Try(builder.addField(field, value))
case value: Float => Try(builder.addField(field, value.toDouble))
case value: Boolean => Try(builder.addField(field, value))
case value: java.math.BigDecimal => Try(builder.addField(field, value))
case value: BigDecimal => Try(builder.addField(field, value.bigDecimal))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import com.datamountaineer.streamreactor.connect.influx2.writers.KcqlDetails._
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.connect.sink.SinkRecord
import com.datamountaineer.streamreactor.connect.influx2.javadto.BatchPoints
import com.influxdb.client.write.Point

import scala.jdk.CollectionConverters.ListHasAsScala
Expand Down Expand Up @@ -110,10 +109,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e
)
}

def build(records: Iterable[SinkRecord]): Try[BatchPoints] = {
val batchPoints: BatchPoints = BatchPoints.bucket(settings.bucket)
.consistency(settings.consistencyLevel)
.build()
def build(records: Iterable[SinkRecord]): Try[Seq[Point]] = {

records
.map { record =>
Expand All @@ -130,9 +126,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e
}
.foldLeft(Try(Seq.empty[Seq[Point]]))(Util.shortCircuitOnFailure)
.map(_.flatten)
.map { points =>
batchPoints.setPoints(points)
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.connect.sink.SinkRecord
import com.influxdb.client.InfluxDBClientFactory

import scala.jdk.CollectionConverters.SeqHasAsJava
import scala.util.Try

class InfluxDbWriter(settings: InfluxSettings) extends DbWriter with StrictLogging with ErrorHandler {
Expand All @@ -49,8 +50,8 @@ class InfluxDbWriter(settings: InfluxSettings) extends DbWriter with StrictLoggi
builder
.build(records)
.flatMap { batchPoints =>
logger.debug(s"Writing ${batchPoints.getPoints.size()} points to the database...")
Try(writeAPI.writePoints(batchPoints.getPoints()))
logger.debug(s"Writing ${batchPoints.size} points to the database...")
Try(writeAPI.writePoints(batchPoints.asJava))
}.map(_ => logger.debug("Writing complete")),
)
}
Expand Down
Loading

0 comments on commit 57250e9

Please sign in to comment.