From b6391a6ef2504d61092bf63377d20812209f12a6 Mon Sep 17 00:00:00 2001 From: ramil Date: Mon, 27 Nov 2023 16:09:07 +0100 Subject: [PATCH 1/3] support influxdb v2 --- .../io/waylay/influxdb/InfluxDB2Spec.scala | 298 ++++++++++++++++++ .../io/waylay/influxdb/InfluxDBSpec.scala | 2 +- .../io/waylay/influxdb/IntegrationSpec.scala | 49 ++- .../scala/io/waylay/influxdb/InfluxDB.scala | 14 +- .../scala/io/waylay/influxdb/InfluxDB2.scala | 218 +++++++++++++ 5 files changed, 566 insertions(+), 15 deletions(-) create mode 100644 src/it/scala/io/waylay/influxdb/InfluxDB2Spec.scala create mode 100644 src/main/scala/io/waylay/influxdb/InfluxDB2.scala diff --git a/src/it/scala/io/waylay/influxdb/InfluxDB2Spec.scala b/src/it/scala/io/waylay/influxdb/InfluxDB2Spec.scala new file mode 100644 index 0000000..f7171e7 --- /dev/null +++ b/src/it/scala/io/waylay/influxdb/InfluxDB2Spec.scala @@ -0,0 +1,298 @@ +package io.waylay.influxdb + +import io.waylay.influxdb.Influx._ +import io.waylay.influxdb.InfluxDB._ +import io.waylay.influxdb.query.InfluxQueryBuilder +import io.waylay.influxdb.query.InfluxQueryBuilder.Interval +import org.specs2.concurrent.ExecutionEnv +import org.specs2.mutable.Specification +import play.api.libs.json.{JsDefined, JsString} + +import java.time.Instant +import scala.concurrent.Await +import scala.concurrent.duration._ + +class InfluxDB2Spec(implicit ee: ExecutionEnv) extends Specification with IntegrationSpecV2 { + sequential + + "then influxdb2 client" should { + + "create bucket with retention" in { + val bucket = "testdb2.io" + val influxClient = + new InfluxDB2(wsClient, host, org, token, mappedInfluxPort, defaultRetention = "4w") + Await.result(influxClient.createBucket(bucket), 5.seconds) + Await.result(influxClient.getRetention(bucket), 5.seconds) must be equalTo InfluxDB + .parseDurationLiteral("4w") + .toMillis / 1000 + + } + + "return a version on ping" in { + val influxClient = new InfluxDB2(wsClient, host, org, token, mappedInfluxPort) + val version = Await.result(influxClient.ping, 15.seconds) + version must be equalTo "v2.7.4" + } + + "return ready" in { + val influxClient = new InfluxDB2(wsClient, host, org, token, mappedInfluxPort) + val stats = Await.result(influxClient.ready, 15.seconds) + stats \ "status" must be equalTo JsDefined(JsString("ready")) + } + + "store and query data" in { + val points = Seq( + IPoint( + "temperature", + Seq("location" -> "room1"), + Seq("value" -> IFloat(20.3)), + Instant.now() + ), + // 2 values + IPoint( + "indoor", + Seq("location" -> "room2", "building" -> "A"), + Seq("temperature" -> IFloat(19.3), "humidity" -> IFloat(35.1)), + Instant.now() + ) + ) + + val query = InfluxQueryBuilder.simple( + Seq("value"), + "location" -> "room1", + "temperature" + ) + + val influxClient = + new InfluxDB2(wsClient, host, org, token, mappedInfluxPort, defaultRetention = "INF") + val storeResult = Await.result( + influxClient.storeAndMakeBucketIfNeeded("dbname", points), + 5.seconds + ) + println(storeResult) + storeResult must be equalTo () + + val data = Await.result(influxClient.query("dbname", query), 5.seconds) + + (data.error must beNone) and + (data.results.get.head.series.get must have size 1) and + (data.results.get.head.series.get.head.name must be equalTo "temperature") + } + + "query aggregated data" in { + + val points = Seq( + IPoint( + "temperature", + Seq("location" -> "room1"), + Seq("value" -> IFloat(20)), + Instant.ofEpochSecond(0) + ), + IPoint( + "temperature", + Seq("location" -> "room1"), + Seq("value" -> IFloat(22)), + Instant.ofEpochSecond(60) + ), + IPoint( + "temperature", + Seq("location" -> "room1"), + Seq("value" -> IFloat(24)), + Instant.ofEpochSecond(120) + ), + IPoint( + "temperature", + Seq("location" -> "room1"), + Seq("value" -> IFloat(26)), + Instant.ofEpochSecond(180) + ) + ) + + val influxClient = new InfluxDB2(wsClient, host, org, token, mappedInfluxPort, defaultRetention = "INF") + Await.result( + influxClient.storeAndMakeBucketIfNeeded("dbname", points), + 5.seconds + ) + + val query = InfluxQueryBuilder.grouped( + Mean("value"), + "location" -> "room1", + "temperature", + InfluxDB.Duration.minutes(2), + Interval + .fromUntil(Instant.ofEpochSecond(0), Instant.ofEpochSecond(200)) + ) + + // println(query) + + val data = Await.result(influxClient.query("dbname", query), 5.seconds) + data.error must beNone + // println(data.results) + data.results.get.head.series.get must have size 1 + data.results.get.head.series.get.head.values.get must be equalTo Seq( + Seq(Some(IString("1970-01-01T00:00:00Z")), Some(IFloat(21.0))), + Seq(Some(IString("1970-01-01T00:02:00Z")), Some(IFloat(25.0))) + ) + } + "query aggregated data with filter" in { + + val points = Seq( + IPoint( + "temperature", + Seq("location" -> "roomFilterAggregated"), + Seq("value" -> IFloat(20)), + Instant.ofEpochSecond(0) + ), + IPoint( + "temperature", + Seq("location" -> "roomFilterAggregated"), + Seq("value" -> IFloat(20)), + Instant.ofEpochSecond(30) + ), + IPoint( + "temperature", + Seq("location" -> "roomFilterAggregated"), + Seq("value" -> IFloat(22)), + Instant.ofEpochSecond(60) + ), + IPoint( + "temperature", + Seq("location" -> "roomFilterAggregated"), + Seq("value" -> IFloat(24)), + Instant.ofEpochSecond(120) + ), + IPoint( + "temperature", + Seq("location" -> "roomFilterAggregated"), + Seq("value" -> IFloat(23)), + Instant.ofEpochSecond(140) + ), + IPoint( + "temperature", + Seq("location" -> "roomFilterAggregated"), + Seq("value" -> IFloat(26)), + Instant.ofEpochSecond(180) + ) + ) + + val influxClient = new InfluxDB2(wsClient, host, org, token, mappedInfluxPort, defaultRetention = "INF") + Await.result( + influxClient.storeAndMakeBucketIfNeeded("dbname", points), + 5.seconds + ) + + val query = InfluxQueryBuilder.grouped( + Mean("value"), + "location" -> "roomFilterAggregated", + "temperature", + InfluxDB.Duration.minutes(2), + Interval + .fromUntil(Instant.ofEpochSecond(0), Instant.ofEpochSecond(200)), + Some(OR(IFieldFilter("value", LT, IFloat(22)), IFieldFilter("value", GT, IFloat(23)))) + ) + + // println(query) + + val data = Await.result(influxClient.query("dbname", query), 5.seconds) + data.error must beNone + // println(data.results) + data.results.get.head.series.get must have size 1 + data.results.get.head.series.get.head.values.get must be equalTo Seq( + Seq(Some(IString("1970-01-01T00:00:00Z")), Some(IFloat(20.0))), + Seq(Some(IString("1970-01-01T00:02:00Z")), Some(IFloat(25.0))) + ) + } + + "query measurements" in { + val points = Seq( + IPoint( + "temperature", + Seq("location" -> "room1"), + Seq("value" -> IFloat(20)), + Instant.ofEpochSecond(0) + ), + IPoint( + "humidity", + Seq("location" -> "room1"), + Seq("value" -> IFloat(22)), + Instant.ofEpochSecond(60) + ), + IPoint( + "noise", + Seq("location" -> "room1"), + Seq("value" -> IFloat(24)), + Instant.ofEpochSecond(120) + ), + IPoint( + "co2", + Seq("location" -> "room1"), + Seq("value" -> IFloat(26)), + Instant.ofEpochSecond(180) + ) + ) + + val influxClient = new InfluxDB2(wsClient, host, org, token, mappedInfluxPort, defaultRetention = "INF") + Await.result( + influxClient.storeAndMakeBucketIfNeeded("testdb2", points), + 5.seconds + ) + + val query = "show measurements" + + // println(query) + + val data = Await.result(influxClient.query("testdb2", query), 5.seconds) + data.error must beNone + // println(data.results) + data.results.get.head.series.get must have size 1 + data.results.get.head.series.get.head.values.get must be equalTo Seq( + Seq(Some(IString("co2"))), + Seq(Some(IString("humidity"))), + Seq(Some(IString("noise"))), + Seq(Some(IString("temperature"))) + ) + + } + + "query aggregated data for string values returns result with error" in { + val stringmeasurement = "stringmeasurement" + + val points = Seq( + IPoint( + stringmeasurement, + Seq("location" -> "room2"), + Seq("value" -> IString("hello")), + Instant.ofEpochSecond(0) + ), + IPoint( + stringmeasurement, + Seq("location" -> "room2"), + Seq("value" -> IString("hello2")), + Instant.ofEpochSecond(60) + ) + ) + + val influxClient = new InfluxDB2(wsClient, host, org, token, mappedInfluxPort, defaultRetention = "INF") + Await.result( + influxClient.storeAndMakeBucketIfNeeded("dbname", points), + 5.seconds + ) + + val query = InfluxQueryBuilder.grouped( + Mean("value"), + "location" -> "room2", + stringmeasurement, + InfluxDB.Duration.minutes(2), + Interval + .fromUntil(Instant.ofEpochSecond(0), Instant.ofEpochSecond(200)) + ) + + val data = Await.result(influxClient.query("dbname", query), 5.seconds) + data.error must beNone + data.results.get.head mustEqual Result( + None, + Some("unsupported mean iterator type: *query.stringInterruptIterator") + ) + } + } +} diff --git a/src/it/scala/io/waylay/influxdb/InfluxDBSpec.scala b/src/it/scala/io/waylay/influxdb/InfluxDBSpec.scala index 493f0bb..582f88b 100644 --- a/src/it/scala/io/waylay/influxdb/InfluxDBSpec.scala +++ b/src/it/scala/io/waylay/influxdb/InfluxDBSpec.scala @@ -11,7 +11,7 @@ import org.specs2.mutable.Specification import scala.concurrent.Await import scala.concurrent.duration._ -class InfluxDBSpec(implicit ee: ExecutionEnv) extends Specification with IntegrationSpec { +class InfluxDBSpec(implicit ee: ExecutionEnv) extends Specification with IntegrationSpecV1 { "then influxdb client" should { diff --git a/src/it/scala/io/waylay/influxdb/IntegrationSpec.scala b/src/it/scala/io/waylay/influxdb/IntegrationSpec.scala index b41f0aa..b2927a4 100644 --- a/src/it/scala/io/waylay/influxdb/IntegrationSpec.scala +++ b/src/it/scala/io/waylay/influxdb/IntegrationSpec.scala @@ -21,16 +21,7 @@ trait IntegrationSpec extends BeforeAfterAllStopOnError { val DefaultInfluxDBAdminPort = 8083 val DefaultInfluxDBPort = 8086 - lazy val influxdbContainer: Container = ContainerSpec("influxdb:1.8.9-alpine") - .withExposedPorts(DefaultInfluxDBPort, DefaultInfluxDBAdminPort) - .withReadyChecker( - // PrintingLogLineContains("Listening on HTTP: [::]:8086") - DockerReadyChecker - .HttpResponseCode(DefaultInfluxDBPort, "/ping", code = 204) - .within(100.millis) - .looped(20, 250.millis) - ) - .toContainer + def influxdbContainer: Container def beforeAll(): Unit = startAllOrFail() @@ -52,6 +43,8 @@ trait IntegrationSpec extends BeforeAfterAllStopOnError { lazy val mappedInfluxPort: Int = influxdbContainer.mappedPort(InfluxDB.DEFAULT_PORT) val host = "localhost" // state.docker.host val wsClient: StandaloneAhcWSClient = StandaloneAhcWSClient() + val org: String = "myorg" + val token: String = "mytoken" // Do we have a around available that makes this more robust? @@ -65,7 +58,7 @@ trait IntegrationSpec extends BeforeAfterAllStopOnError { wsClient.close() materializer.shutdown() actorSystem.terminate() - try containerManager.stop() + try containerManager.stopRmAll() catch { case e: Throwable => log.error(e.getMessage, e) @@ -90,3 +83,37 @@ trait IntegrationSpec extends BeforeAfterAllStopOnError { def beforeStop(): Unit = {} } + +trait IntegrationSpecV1 extends IntegrationSpec { + lazy val influxdbContainer: Container = ContainerSpec("influxdb:1.8.9-alpine") + .withExposedPorts(DefaultInfluxDBPort, DefaultInfluxDBAdminPort) + .withReadyChecker( + DockerReadyChecker + .HttpResponseCode(DefaultInfluxDBPort, "/ping", code = 204) + .within(100.millis) + .looped(20, 250.millis) + ) + .toContainer + +} + +trait IntegrationSpecV2 extends IntegrationSpec { + lazy val influxdbContainer: Container = ContainerSpec("influxdb:2.7-alpine") + .withExposedPorts(DefaultInfluxDBPort, DefaultInfluxDBAdminPort) + .withReadyChecker( + DockerReadyChecker + .HttpResponseCode(DefaultInfluxDBPort, "/ping", code = 204) + .within(100.millis) + .looped(40, 250.millis) + ) + .withEnv( + "DOCKER_INFLUXDB_INIT_MODE=setup", + "DOCKER_INFLUXDB_INIT_USERNAME=admin", + "DOCKER_INFLUXDB_INIT_PASSWORD=adminpasswordpasswordpassword", + s"DOCKER_INFLUXDB_INIT_ORG=$org", + "DOCKER_INFLUXDB_INIT_BUCKET=testbucket", + s"DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=$token" + ) + .toContainer + +} diff --git a/src/main/scala/io/waylay/influxdb/InfluxDB.scala b/src/main/scala/io/waylay/influxdb/InfluxDB.scala index ac69367..bd8cbe1 100644 --- a/src/main/scala/io/waylay/influxdb/InfluxDB.scala +++ b/src/main/scala/io/waylay/influxdb/InfluxDB.scala @@ -106,7 +106,7 @@ object InfluxDB { case class OR(filter1: IFilter, filter2: IFilter, other: IFilter*) extends IFilter case class NOT(filter: IFilter) extends IFilter - private def epochToQueryParam(epoch: Epoch) = epoch match { + private[influxdb] def epochToQueryParam(epoch: Epoch) = epoch match { case Hours => "h" case Minutes => "m" case Seconds => "s" @@ -151,18 +151,26 @@ object InfluxDB { duration.amount.toString + stringUnit } - private sealed trait Method { + private[influxdb] sealed trait Method { def endpoint: String } private case object Write extends Method { override val endpoint = "write" } - private case object Query extends Method { + private[influxdb] case object Query extends Method { override val endpoint = "query" } private case object Ping extends Method { override val endpoint = "ping" } + + private[influxdb] case object Bucket extends Method { + override val endpoint = "api/v2/buckets" + } + + private[influxdb] case object Write2 extends Method { + override val endpoint = "api/v2/write" + } } class InfluxDB( diff --git a/src/main/scala/io/waylay/influxdb/InfluxDB2.scala b/src/main/scala/io/waylay/influxdb/InfluxDB2.scala new file mode 100644 index 0000000..a8e5dd7 --- /dev/null +++ b/src/main/scala/io/waylay/influxdb/InfluxDB2.scala @@ -0,0 +1,218 @@ +package io.waylay.influxdb + +import io.waylay.influxdb.Influx._ +import org.slf4j.LoggerFactory +import play.api.libs.json.{JsObject, JsValue, Json} +import play.api.libs.json.Json.{arr, obj} +import play.api.libs.ws.StandaloneWSClient +import play.api.libs.ws.DefaultBodyWritables._ +import play.api.libs.ws.JsonBodyWritables._ +import play.api.libs.ws.JsonBodyReadables._ + +import scala.concurrent.duration.{MILLISECONDS, TimeUnit} +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try + +class InfluxDB2( + ws: StandaloneWSClient, + host: String = "localhost", + org: String, + token: String, + port: Int = InfluxDB.DEFAULT_PORT, + schema: String = "http", + defaultRetention: String = "INF" +)(implicit ec: ExecutionContext) { + import InfluxDB._ + + private final val logger = LoggerFactory.getLogger(getClass) + private final val baseUrl = s"$schema://$host:$port" + + def ping: Future[Version] = { + val req = ws + .url(baseUrl + "/ping") + .withRequestTimeout(INFLUX_PING_REQUEST_TIMEOUT) + logger.debug(s" -> $req") + req.get().map { response => + logger.debug("status: " + response.status) + val body = Some(response.body).filter(_.nonEmpty).getOrElse("[empty]") + val version = response.header("X-Influxdb-Version").get + logger.info(s"influxdb ping completed, version = $version, body = $body") + response.header("X-Influxdb-Version").get + } + } + + def ready: Future[JsObject] = + authenticatedUrl(s"${baseUrl}/ready").get().flatMap { response => + logger.debug("status: " + response.status) + response.status match { + case 200 => // ok + logger.trace(s"got data\n${Json.prettyPrint(response.body[JsValue])}") + Future.successful(response.body[JsValue].as[JsObject]) + case other => + Future.failed(new RuntimeException(s"Got status ${response.status} with body: ${response.body}")) + } + } + + def storeAndMakeBucketIfNeeded( + bucketName: String, + points: Seq[IPoint], + createDbIfNeeded: Boolean = true, + precision: TimeUnit = MILLISECONDS + ): Future[Unit] = { + val data = WriteProtocol.write(precision, points: _*) + logger.debug(s"storing data to $bucketName\n$data") + val req = authenticatedUrlForBucket(bucketName, Write2, precision) + req.post(data).flatMap { response => + logger.debug(response.toString) + logger.debug(response.body) + response.status match { + case 404 if createDbIfNeeded => + // make sure we don't end up in an endless loop + createBucket(bucketName) + .flatMap(_ => storeAndMakeBucketIfNeeded(bucketName, points, createDbIfNeeded = false)) + case 204 => // ok + logger.info(s"stored ${points.length} points to $bucketName") + Future.successful(()) + case _ => + Future.failed( + new RuntimeException( + s"""Got status ${response.status} with body: ${response.body.stripLineEnd} when saving ${points.length} points to $bucketName """ + ) + ) + } + } + } + def createBucket(bucketName: String): Future[Unit] = { + + val duration = durationLiteralToDuration(defaultRetention) + if (duration == -1) { + Future.failed(new RuntimeException(s"Invalid retention duration: $defaultRetention")) + } else { + getOrgId(org).flatMap { orgId => + val body = obj( + "orgID" -> s"$orgId", + "name" -> bucketName, + "retentionRules" -> arr(obj("type" -> "expire", "everySeconds" -> duration, "shardGroupDurationSeconds" -> 0)) + ) + authenticatedUrlFor(Bucket).addHttpHeaders("Content-Type" -> "application/json").post(body).flatMap { + response => + logger.info("status: " + response.status) + logger.debug(response.headers.mkString("\n")) + logger.debug(response.body) + + if (response.status != 201) { + Future.failed(new RuntimeException(s"Got status ${response.status} with body: ${response.body}")) + } else { + Future.successful(()) + } + } + } + } + } + + def query( + bucketName: String, + query: String, + chunkSize: Option[Int] = None, + epoch: Option[Epoch] = None + ): Future[Results] = { + logger.debug(query) + + val extraQueryString = Seq( + chunkSize.map("chunk_size" -> _.toString), + epoch.map("epoch" -> epochToQueryParam(_)) + ).flatten + + val req = authenticatedUrlForDatabase(bucketName, Query) + .addQueryStringParameters("q" -> query) + .addQueryStringParameters(extraQueryString: _*) + + logger.debug(s" -> $req") + req.get().flatMap { response => + logger.debug("status: " + response.status) + response.status match { + case 404 => + Future.failed(new RuntimeException(s"Got status ${response.status} with body: ${response.body}")) + case 200 => // ok + logger.debug(s"got data\n${Json.prettyPrint(response.body[JsValue])}") + import io.waylay.influxdb.query.QueryResultProtocol._ + val results = response.body[JsValue].as[Results] + if (results.hasDatabaseNotFoundError) { + Future.successful(Results(Some(Seq.empty), None)) + } else { + Future.successful(results) + } + case other => + Future.failed(new RuntimeException(s"Got status ${response.status} with body: ${response.body}")) + } + } + } + + def getOrgId(name: String): Future[String] = + authenticatedUrl(s"$baseUrl/api/v2/orgs").addQueryStringParameters("org" -> name).get().flatMap { resp => + if (resp.status != 200) { + Future.failed(new RuntimeException(s"Got status ${resp.status} with body: ${resp.body}")) + } else { + (resp.body[JsValue] \ "orgs") + .as[Seq[JsObject]] + .headOption + .map(o => (o \ "id").as[String]) + .map(Future.successful) + .getOrElse(Future.failed(new RuntimeException("organisation id not found"))) + + } + } + + def getRetention(bucketName: String): Future[Long] = + authenticatedUrl(s"$baseUrl/${Bucket.endpoint}").addQueryStringParameters("name" -> bucketName).get().flatMap { + resp => + if (resp.status != 200) { + Future.failed(new RuntimeException(s"Got status ${resp.status} with body: ${resp.body}")) + } else { + (resp.body[JsValue] \ "buckets" \ 0 \ "retentionRules") + .as[Seq[JsObject]] + .headOption + .map(o => (o \ "everySeconds").as[Long]) + .map(Future.successful) + .getOrElse(Future.failed(new RuntimeException(s"bucket ${bucketName} not found"))) + } + } + + private def durationLiteralToDuration(durationLiteral: String): Long = + durationLiteral.toLowerCase match { + case "inf" => 0 + case s => Try(parseDurationLiteral(durationLiteral).toMillis / 1000).getOrElse(-1) + } + + private def authenticatedUrlForDatabase(bucketName: String, method: Method) = { + + val url = s"$baseUrl/${method.endpoint}" + authenticatedUrl(url).addQueryStringParameters( + "db" -> bucketName + ) + } + + private def authenticatedUrlForBucket( + bucketName: String, + method: Method, + precision: TimeUnit = MILLISECONDS + ) = { + val influxPrecision = precision match { + case MILLISECONDS => "ms" + case _ => throw new RuntimeException(s"precision $precision not implemented") + } + + val url = s"$baseUrl/${method.endpoint}" + authenticatedUrl(url).addQueryStringParameters( + "bucket" -> bucketName, + "org" -> org, + "precision" -> influxPrecision + ) + } + private def authenticatedUrlFor(method: Method) = { + val url = s"$baseUrl/${method.endpoint}" + authenticatedUrl(url) + } + private def authenticatedUrl(url: String) = + ws.url(url).withRequestTimeout(INFLUX_REQUEST_TIMEOUT).withHttpHeaders("Authorization" -> s"Token $token") +} From 075340d6797b1b06e2be0f1596db111b2c2b760c Mon Sep 17 00:00:00 2001 From: ramil Date: Tue, 28 Nov 2023 17:50:00 +0100 Subject: [PATCH 2/3] implement delete series --- .../io/waylay/influxdb/InfluxDB2Spec.scala | 46 +++++++++++++++++++ .../scala/io/waylay/influxdb/InfluxDB.scala | 4 ++ .../scala/io/waylay/influxdb/InfluxDB2.scala | 16 +++++++ .../influxdb/query/InfluxQueryBuilder.scala | 5 ++ 4 files changed, 71 insertions(+) diff --git a/src/it/scala/io/waylay/influxdb/InfluxDB2Spec.scala b/src/it/scala/io/waylay/influxdb/InfluxDB2Spec.scala index f7171e7..93f0073 100644 --- a/src/it/scala/io/waylay/influxdb/InfluxDB2Spec.scala +++ b/src/it/scala/io/waylay/influxdb/InfluxDB2Spec.scala @@ -294,5 +294,51 @@ class InfluxDB2Spec(implicit ee: ExecutionEnv) extends Specification with Integr Some("unsupported mean iterator type: *query.stringInterruptIterator") ) } + + "store and delete data" in { + val points = Seq( + IPoint( + "temperature", + Seq("location" -> "room1"), + Seq("value" -> IFloat(20.3)), + Instant.now() + ), + // 2 values + IPoint( + "indoor", + Seq("location" -> "room2", "building" -> "A"), + Seq("temperature" -> IFloat(19.3), "humidity" -> IFloat(35.1)), + Instant.now() + ) + ) + val deletePredicate = InfluxQueryBuilder.deleteSeriesPredicate("location" -> "room1") + + val query = InfluxQueryBuilder.simple( + Seq("value"), + "location" -> "room1", + "temperature" + ) + + val influxClient = + new InfluxDB2(wsClient, host, org, token, mappedInfluxPort, defaultRetention = "INF") + val storeResult = Await.result( + influxClient.storeAndMakeBucketIfNeeded("dbnamefordelete", points), + 5.seconds + ) + + storeResult must be equalTo () + + val deleted = + Await.result( + influxClient.deleteSeries("dbnamefordelete", deletePredicate, Instant.ofEpochMilli(0), Instant.now()), + 5.seconds + ) + + deleted must be equalTo () + val data = Await.result(influxClient.query("dbnamefordelete", query), 5.seconds) + + (data.error must beNone) and + (data.results.get.head.series must beNone) + } } } diff --git a/src/main/scala/io/waylay/influxdb/InfluxDB.scala b/src/main/scala/io/waylay/influxdb/InfluxDB.scala index bd8cbe1..653bc6f 100644 --- a/src/main/scala/io/waylay/influxdb/InfluxDB.scala +++ b/src/main/scala/io/waylay/influxdb/InfluxDB.scala @@ -171,6 +171,10 @@ object InfluxDB { private[influxdb] case object Write2 extends Method { override val endpoint = "api/v2/write" } + + private[influxdb] case object Delete2 extends Method { + override val endpoint = "api/v2/delete" + } } class InfluxDB( diff --git a/src/main/scala/io/waylay/influxdb/InfluxDB2.scala b/src/main/scala/io/waylay/influxdb/InfluxDB2.scala index a8e5dd7..0556140 100644 --- a/src/main/scala/io/waylay/influxdb/InfluxDB2.scala +++ b/src/main/scala/io/waylay/influxdb/InfluxDB2.scala @@ -9,6 +9,7 @@ import play.api.libs.ws.DefaultBodyWritables._ import play.api.libs.ws.JsonBodyWritables._ import play.api.libs.ws.JsonBodyReadables._ +import java.time.Instant import scala.concurrent.duration.{MILLISECONDS, TimeUnit} import scala.concurrent.{ExecutionContext, Future} import scala.util.Try @@ -110,6 +111,21 @@ class InfluxDB2( } } + def deleteSeries(bucketName: String, predicate: String, startTime: Instant, stopTime: Instant): Future[Unit] = { + val data = obj("predicate" -> predicate, "start" -> startTime, "stop" -> stopTime) + authenticatedUrlForBucket(bucketName, Delete2).post(data).flatMap { response => + logger.debug("status: " + response.status) + response.status match { + + case 204 => // ok + Future.successful(()) + + case other => + Future.failed(new RuntimeException(s"Got status ${response.status} with body: ${response.body}")) + } + } + + } def query( bucketName: String, query: String, diff --git a/src/main/scala/io/waylay/influxdb/query/InfluxQueryBuilder.scala b/src/main/scala/io/waylay/influxdb/query/InfluxQueryBuilder.scala index ed7a769..fd1f889 100644 --- a/src/main/scala/io/waylay/influxdb/query/InfluxQueryBuilder.scala +++ b/src/main/scala/io/waylay/influxdb/query/InfluxQueryBuilder.scala @@ -101,6 +101,11 @@ object InfluxQueryBuilder extends SharedProtocol { s" AND time <= ${i.toEpochMilli}ms" ) + def deleteSeriesPredicate(tagSelector: (String, String)): String = + s""" + |${escapeValue(tagSelector._1)}=${escapeValue(tagSelector._2)} + |""".stripMargin.trim + /** * String Literals (Single-quoted) * String literals are values (like integers or booleans). In InfluxQL, all tag values are string literals, and any From 415fd644018e7e91e315d9993d12c51d3d65a195 Mon Sep 17 00:00:00 2001 From: ramil Date: Wed, 29 Nov 2023 12:20:20 +0100 Subject: [PATCH 3/3] review comments --- .../scala/io/waylay/influxdb/InfluxDB2.scala | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/main/scala/io/waylay/influxdb/InfluxDB2.scala b/src/main/scala/io/waylay/influxdb/InfluxDB2.scala index 0556140..ccd4128 100644 --- a/src/main/scala/io/waylay/influxdb/InfluxDB2.scala +++ b/src/main/scala/io/waylay/influxdb/InfluxDB2.scala @@ -93,7 +93,7 @@ class InfluxDB2( val body = obj( "orgID" -> s"$orgId", "name" -> bucketName, - "retentionRules" -> arr(obj("type" -> "expire", "everySeconds" -> duration, "shardGroupDurationSeconds" -> 0)) + "retentionRules" -> arr(obj("type" -> "expire", "everySeconds" -> duration)) ) authenticatedUrlFor(Bucket).addHttpHeaders("Content-Type" -> "application/json").post(body).flatMap { response => @@ -180,18 +180,17 @@ class InfluxDB2( } def getRetention(bucketName: String): Future[Long] = - authenticatedUrl(s"$baseUrl/${Bucket.endpoint}").addQueryStringParameters("name" -> bucketName).get().flatMap { - resp => - if (resp.status != 200) { - Future.failed(new RuntimeException(s"Got status ${resp.status} with body: ${resp.body}")) - } else { - (resp.body[JsValue] \ "buckets" \ 0 \ "retentionRules") - .as[Seq[JsObject]] - .headOption - .map(o => (o \ "everySeconds").as[Long]) - .map(Future.successful) - .getOrElse(Future.failed(new RuntimeException(s"bucket ${bucketName} not found"))) - } + authenticatedUrlFor(Bucket).addQueryStringParameters("name" -> bucketName).get().flatMap { resp => + if (resp.status != 200) { + Future.failed(new RuntimeException(s"Got status ${resp.status} with body: ${resp.body}")) + } else { + (resp.body[JsValue] \ "buckets" \ 0 \ "retentionRules") + .as[Seq[JsObject]] + .headOption + .map(o => (o \ "everySeconds").as[Long]) + .map(Future.successful) + .getOrElse(Future.failed(new RuntimeException(s"bucket ${bucketName} not found"))) + } } private def durationLiteralToDuration(durationLiteral: String): Long =