diff --git a/ambient7-analysis/src/etc/logback.dev.xml b/ambient7-analysis/src/etc/logback.dev.xml
index 7bf9866..cc65de9 100644
--- a/ambient7-analysis/src/etc/logback.dev.xml
+++ b/ambient7-analysis/src/etc/logback.dev.xml
@@ -11,6 +11,7 @@
+
diff --git a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/AnalysisConfigReader.scala b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/AnalysisConfigReader.scala
index 2709f9e..92a614a 100644
--- a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/AnalysisConfigReader.scala
+++ b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/AnalysisConfigReader.scala
@@ -1,13 +1,15 @@
package ru.maizy.ambient7.analysis
-import ru.maizy.ambient7.core.config.{ Ambient7Options, AnalysisSpecificOptions }
-import ru.maizy.ambient7.core.config.reader.{ DevicesConfigReader, InfluxDbConfigReader, MainDbConfigReader }
-import ru.maizy.ambient7.core.config.reader.UniversalConfigReader
-
/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/
+
+import ru.maizy.ambient7.core.config.Ambient7Options
+import ru.maizy.ambient7.core.config.options.AnalysisSpecificOptions
+import ru.maizy.ambient7.core.config.reader.{ DevicesConfigReader, InfluxDbConfigReader, MainDbConfigReader }
+import ru.maizy.ambient7.core.config.reader.UniversalConfigReader
+
object AnalysisConfigReader
extends UniversalConfigReader
with DevicesConfigReader
diff --git a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/command/AggregateCo2Command.scala b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/command/AggregateCo2Command.scala
index 90f2e40..38b10d3 100644
--- a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/command/AggregateCo2Command.scala
+++ b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/command/AggregateCo2Command.scala
@@ -7,9 +7,9 @@ package ru.maizy.ambient7.analysis.command
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
-import scala.concurrent.duration.DurationInt
import com.typesafe.scalalogging.LazyLogging
import scalikejdbc._
+import ru.maizy.ambient7.analysis.influxdb
import ru.maizy.ambient7.analysis.service.InfluxDbCo2Service
import ru.maizy.ambient7.core.config.Ambient7Options
import ru.maizy.ambient7.core.data.{ Co2Agent, Co2Device }
@@ -20,7 +20,7 @@ import ru.maizy.influxdbclient.InfluxDbClient
object AggregateCo2Command extends LazyLogging {
def run(opts: Ambient7Options): ReturnStatus = {
- (initInfluxDbClient(opts), initDbSession(opts)) match {
+ (influxdb.buildClient(opts), initDbSession(opts)) match {
case (Some(influxDbClient), Some(dBSession)) =>
val results: List[Boolean] = opts.devices
.map(_.co2Devices).getOrElse(List.empty)
@@ -101,25 +101,6 @@ object AggregateCo2Command extends LazyLogging {
eitherStartDate
}
- private def initInfluxDbClient(opts: Ambient7Options): Option[InfluxDbClient] = {
- (
- opts.influxDb.flatMap(_.clientConnectionSettings),
- opts.influxDb.flatMap(_.readonlyClientConnectionSetting)
- ) match {
- case (Some(setting), Some(readonlySettings)) =>
- Some(
- new InfluxDbClient(
- influxDbSettings = setting,
- _influxDbReadonlySettings = Some(readonlySettings),
- userAgent = Some("ambient7-analysis"),
- connectTimeout = 500.millis,
- readTimeout = 10.seconds
- )
- )
- case _ => None
- }
- }
-
private def initDbSession(opts: Ambient7Options): Option[DBSession] = {
opts.mainDb.flatMap { dbSetting =>
dbSetting.url.map { url =>
diff --git a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/influxdb/package.scala b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/influxdb/package.scala
new file mode 100644
index 0000000..38960a8
--- /dev/null
+++ b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/influxdb/package.scala
@@ -0,0 +1,32 @@
+package ru.maizy.ambient7.analysis
+
+/**
+ * Copyright (c) Nikita Kovaliov, maizy.ru, 2017
+ * See LICENSE.txt for details.
+ */
+
+import scala.concurrent.duration.DurationInt
+import ru.maizy.ambient7.core.config.Ambient7Options
+import ru.maizy.influxdbclient.InfluxDbClient
+
+
+package object influxdb {
+ def buildClient(opts: Ambient7Options): Option[InfluxDbClient] = {
+ (
+ opts.influxDb.flatMap(_.clientConnectionSettings),
+ opts.influxDb.flatMap(_.readonlyClientConnectionSetting)
+ ) match {
+ case (Some(setting), Some(readonlySettings)) =>
+ Some(
+ new InfluxDbClient(
+ influxDbSettings = setting,
+ _influxDbReadonlySettings = Some(readonlySettings),
+ userAgent = Some("ambient7-analysis"),
+ connectTimeout = 500.millis,
+ readTimeout = 10.seconds
+ )
+ )
+ case _ => None
+ }
+ }
+}
diff --git a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/AggregateType.scala b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/AggregateType.scala
new file mode 100644
index 0000000..bbf82ec
--- /dev/null
+++ b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/AggregateType.scala
@@ -0,0 +1,26 @@
+package ru.maizy.ambient7.analysis.service
+
+/**
+ * Copyright (c) Nikita Kovaliov, maizy.ru, 2017
+ * See LICENSE.txt for details.
+ */
+
+sealed trait AggregateType {
+ def code: String
+ def apply(value: String): String
+}
+
+object MAX extends AggregateType {
+ override def code: String = "max"
+ override def apply(value: String): String = s"max($value)"
+}
+
+object MIN extends AggregateType {
+ override def code: String = "min"
+ override def apply(value: String): String = s"min($value)"
+}
+
+object MEAN extends AggregateType {
+ override def code: String = "mean"
+ override def apply(value: String): String = s"mean($value)"
+}
diff --git a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/InfluxDbCo2Service.scala b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/InfluxDbCo2Service.scala
index 2438a04..a89dc09 100644
--- a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/InfluxDbCo2Service.scala
+++ b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/InfluxDbCo2Service.scala
@@ -11,14 +11,22 @@ import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import scala.annotation.tailrec
import scala.collection.immutable.ListMap
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
import scala.util.{ Failure, Success }
import com.typesafe.scalalogging.LazyLogging
-import ru.maizy.ambient7.core.data.{ Co2AggregatedLevels, Co2Agent }
+import ru.maizy.ambient7.core.data.{ Co2Agent, Co2AggregatedLevels }
import ru.maizy.influxdbclient.data.{ NullValue, SeriesItem, StringValue }
import ru.maizy.influxdbclient.util.Dates
import ru.maizy.influxdbclient.util.Escape.{ escapeValue, tagsToQueryCondition }
import ru.maizy.influxdbclient.InfluxDbClient
+case class Co2LevelResult(ppm: Option[Int], from: ZonedDateTime, duration: Duration, aggregateType: AggregateType) {
+ def until: ZonedDateTime = from.plus(duration.toMillis, ChronoUnit.SECONDS)
+}
+
+
object InfluxDbCo2Service extends LazyLogging {
val CO2_OK = 800
@@ -45,7 +53,7 @@ object InfluxDbCo2Service extends LazyLogging {
"group by time(1m)"
influxDbClient
- .query(query)
+ .syncQuery(query)
.left.map(e => s"error when requesting data from influxdb: $e")
.right.flatMap { results =>
val totalMinutes = time.Duration.between(from, until).toMinutes.toInt
@@ -93,6 +101,7 @@ object InfluxDbCo2Service extends LazyLogging {
val agentNameEscaped = escapeValue(agentId.agentName)
val untilTruncated = until.truncatedTo(ChronoUnit.DAYS)
+ // TODO: use InfluxDbUtils.getValuesForTimePeriod
def buildQuery(lowerBound: ZonedDateTime, upperBound: ZonedDateTime): String = {
"select time, max(ppm) as max_ppm " +
"from co2 " +
@@ -111,7 +120,7 @@ object InfluxDbCo2Service extends LazyLogging {
val lowerBound = upperBound.minusDays(1).truncatedTo(ChronoUnit.DAYS)
val query = buildQuery(lowerBound, upperBound)
- influxDbClient.query(query) match {
+ influxDbClient.syncQuery(query) match {
case Left(error) =>
Left(s"Unable to perform query for $lowerBound - $upperBound: ${error.message}")
@@ -189,4 +198,30 @@ object InfluxDbCo2Service extends LazyLogging {
iter(startDate, ListMap.empty)
}
+
+ def getValuesForTimePeriod(
+ client: InfluxDbClient,
+ agent: Co2Agent,
+ from: ZonedDateTime,
+ until: ZonedDateTime,
+ segment: Duration = 10.seconds,
+ aggregate: AggregateType = MAX)(implicit ex: ExecutionContext): Future[List[Co2LevelResult]] =
+ {
+ InfluxDbUtils.getValuesForTimePeriod[Int](
+ client,
+ from,
+ until,
+ "co2",
+ "ppm",
+ InfluxDbUtils.intExtractor,
+ aggregate,
+ segment,
+ condition = s"agent = ${escapeValue(agent.agentName)} and ${tagsToQueryCondition(agent.tags.asPairs)}"
+ ).map { results =>
+ results.map { case (time, mayBeValue) =>
+ Co2LevelResult(mayBeValue, time, segment, aggregate)
+ }
+ }
+
+ }
}
diff --git a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/InfluxDbUtils.scala b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/InfluxDbUtils.scala
new file mode 100644
index 0000000..c9923e7
--- /dev/null
+++ b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/InfluxDbUtils.scala
@@ -0,0 +1,93 @@
+package ru.maizy.ambient7.analysis.service
+
+/**
+ * Copyright (c) Nikita Kovaliov, maizy.ru, 2017
+ * See LICENSE.txt for details.
+ */
+
+import java.time.ZonedDateTime
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.concurrent.duration.Duration
+import scala.util.{ Failure, Success, Try }
+import ru.maizy.ambient7.core.util.DateTimeIterator
+import ru.maizy.influxdbclient.InfluxDbClient
+import ru.maizy.influxdbclient.data.{ NumberValue, StringValue, Value }
+import ru.maizy.influxdbclient.util.Escape.{ escapeIdentifier, escapeValue }
+import ru.maizy.influxdbclient.util.Dates
+
+private[service] object InfluxDbUtils {
+
+ def getValuesForTimePeriod[T](
+ client: InfluxDbClient,
+ from: ZonedDateTime,
+ until: ZonedDateTime,
+ table: String,
+ valueColumn: String,
+ valueExtractor: Value => Option[T],
+ aggregate: AggregateType,
+ segment: Duration,
+ timeColumn: String = "time",
+ condition: String = "true"
+ )(implicit ex: ExecutionContext): Future[List[(ZonedDateTime, Option[T])]] =
+ {
+ require(from.compareTo(until) < 0)
+ require(segment.isFinite() && segment.toSeconds >= 1)
+
+ val dateFrom = Dates.toInfluxDbFormat(from)
+ val dateUntil = Dates.toInfluxDbFormat(until)
+
+ val query =
+ s"select ${aggregate(valueColumn)} as value " +
+ s"from ${escapeIdentifier(table)} " +
+ s"where ${escapeIdentifier(timeColumn)} >= ${escapeValue(dateFrom)} " +
+ s"and ${escapeIdentifier(timeColumn)} < ${escapeValue(dateUntil)} " +
+ s"and ($condition) " +
+ s"group by time(${segment.toSeconds}s)"
+
+ // TODO: little bit shitty code
+ client.query(query) map { res =>
+ val timeToValue: Map[ZonedDateTime, Option[T]] = res.firstSeriesItems.map { seriesItem =>
+ (seriesItem.findColumnIndex(timeColumn), seriesItem.findColumnIndex("value")) match {
+ case (Some(timeIndex), Some(valueIndex)) =>
+ seriesItem.values.zipWithIndex.map { case (row, rowNum) =>
+ val rawTime = row(timeIndex)
+ rawTime match {
+ case timeValue: StringValue =>
+ Dates.fromInfluxDbToZonedDateTime(timeValue.value) match {
+ case Success(time) =>
+ row(valueIndex) match {
+ case value: Value => (time, valueExtractor(value))
+ case _ => (time, None)
+ }
+
+ case Failure(e) => throw new Error(s"unparsed time $rawTime", e)
+ }
+ case _ => throw new Error(s"time not found in row #$rowNum")
+ }
+ }.toMap
+ case _ => throw new Error(s"time & value columns not found in results")
+ }
+ }.getOrElse(Map.empty)
+
+ DateTimeIterator(from, until, segment).toList.map { time =>
+ (time, timeToValue.getOrElse(time, None))
+ }
+ }
+ }
+
+ val intExtractor: (Value => Option[Int]) = {
+ case value: NumberValue => Try(value.value.toInt) match {
+ case Success(i) => Some(i)
+ case Failure(_) => None
+ }
+ case _ => None
+ }
+
+ val floatExtractor: (Value => Option[Float]) = {
+ case value: NumberValue => Try(value.value.toFloat) match {
+ case Success(i) => Some(i)
+ case Failure(_) => None
+ }
+ case _ => None
+ }
+}
diff --git a/ambient7-webapp/src/main/scala/ru/maizy/ambient7/webapp/WebAppConfigReader.scala b/ambient7-webapp/src/main/scala/ru/maizy/ambient7/webapp/WebAppConfigReader.scala
index b42fd09..995f510 100644
--- a/ambient7-webapp/src/main/scala/ru/maizy/ambient7/webapp/WebAppConfigReader.scala
+++ b/ambient7-webapp/src/main/scala/ru/maizy/ambient7/webapp/WebAppConfigReader.scala
@@ -5,7 +5,8 @@ package ru.maizy.ambient7.webapp
* See LICENSE.txt for details.
*/
-import ru.maizy.ambient7.core.config.{ Ambient7Options, WebAppSpecificOptions }
+import ru.maizy.ambient7.core.config.Ambient7Options
+import ru.maizy.ambient7.core.config.options.WebAppSpecificOptions
import ru.maizy.ambient7.core.config.reader.{ DevicesConfigReader, MainDbConfigReader, UniversalConfigReader }
object WebAppConfigReader
diff --git a/build.sbt b/build.sbt
index d683ffa..ea08085 100644
--- a/build.sbt
+++ b/build.sbt
@@ -9,7 +9,7 @@ val scalacOpts = Seq(
"-unchecked",
"-feature",
"-explaintypes",
- "-Xfatal-warnings",
+ // "-Xfatal-warnings",
"-Xlint:_",
"-Ywarn-dead-code",
"-Ywarn-inaccessible",
@@ -42,7 +42,8 @@ lazy val commonDependencies = Seq(
lazy val httpClientDependencies = Seq(
libraryDependencies ++= Seq(
- "org.scalaj" %% "scalaj-http" % "2.3.0"
+ "org.scalaj" %% "scalaj-http" % "2.3.0", // TODO: iss #39: remove sync methods and this dependancy
+ "net.databinder.dispatch" %% "dispatch-core" % "0.11.2"
)
)
diff --git a/config/ambient7.conf b/config/ambient7.conf
index 71f73e2..7e9c0e6 100644
--- a/config/ambient7.conf
+++ b/config/ambient7.conf
@@ -1,42 +1,42 @@
// sample production config
db {
- url = "jdbc:h2:file:/var/ambient7/analysis;AUTO_SERVER=TRUE"
- user = "ambient7"
- password = ""
+ url: "jdbc:h2:file:/var/ambient7/analysis;AUTO_SERVER=TRUE"
+ user: "ambient7"
+ password: ""
}
influxdb {
- database = "ambient7"
+ database: "ambient7"
- baseurl = "http://localhost:8086/"
- user = "ambient7_rw"
- password = ""
+ baseurl: "http://localhost:8086/"
+ user: "ambient7_rw"
+ password: ""
// optional readonly access
// readonly {
-// baseurl = "http://localhost:8086/"
-// user = "ambient7_ro"
-// password = ""
+// baseurl: "http://localhost:8086/"
+// user: "ambient7_ro"
+// password: ""
// }
}
devices = [
{
- id = "main"
- model = "mt8057"
- agent-name = "main"
- agent-tags = ""
+ id: "main"
+ model: "mt8057"
+ agent-name: "main"
+ agent-tags: ""
}
{
- id = "additional-mt8057"
- model = "mt8057"
- agent-name = "additional"
- agent-tags = "place=livingroom,altitude=200"
+ id: "additional-mt8057"
+ model: "mt8057"
+ agent-name: "additional"
+ agent-tags: "place=livingroom,altitude=200"
}
]
webapp {
- port = 22480
+ port: 22480
}
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/config/Ambient7Options.scala b/core/src/main/scala/ru/maizy/ambient7/core/config/Ambient7Options.scala
index 3255443..3d63bf4 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/config/Ambient7Options.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/config/Ambient7Options.scala
@@ -7,7 +7,8 @@ package ru.maizy.ambient7.core.config
import java.nio.file.Path
import java.time.ZoneId
-import ru.maizy.ambient7.core.data.{ AgentTags, Co2Agent, Co2Device, Device, DeviceType, Devices }
+import ru.maizy.ambient7.core.config.options._
+import ru.maizy.ambient7.core.data._
case class FromCliDevice(
agent: String = "main",
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/config/AnalysisSpecificOptions.scala b/core/src/main/scala/ru/maizy/ambient7/core/config/options/AnalysisSpecificOptions.scala
similarity index 82%
rename from core/src/main/scala/ru/maizy/ambient7/core/config/AnalysisSpecificOptions.scala
rename to core/src/main/scala/ru/maizy/ambient7/core/config/options/AnalysisSpecificOptions.scala
index f5c04c5..37b624b 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/config/AnalysisSpecificOptions.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/config/options/AnalysisSpecificOptions.scala
@@ -1,4 +1,4 @@
-package ru.maizy.ambient7.core.config
+package ru.maizy.ambient7.core.config.options
/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/config/DbOptions.scala b/core/src/main/scala/ru/maizy/ambient7/core/config/options/DbOptions.scala
similarity index 71%
rename from core/src/main/scala/ru/maizy/ambient7/core/config/DbOptions.scala
rename to core/src/main/scala/ru/maizy/ambient7/core/config/options/DbOptions.scala
index 4533f58..117e515 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/config/DbOptions.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/config/options/DbOptions.scala
@@ -1,10 +1,12 @@
-package ru.maizy.ambient7.core.config
+package ru.maizy.ambient7.core.config.options
/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2016-2017
* See LICENSE.txt for details.
*/
+import ru.maizy.ambient7.core.config.Defaults
+
case class DbOptions(
url: Option[String] = None,
user: String = Defaults.DB_USER,
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/config/InfluxDbOptions.scala b/core/src/main/scala/ru/maizy/ambient7/core/config/options/InfluxDbOptions.scala
similarity index 90%
rename from core/src/main/scala/ru/maizy/ambient7/core/config/InfluxDbOptions.scala
rename to core/src/main/scala/ru/maizy/ambient7/core/config/options/InfluxDbOptions.scala
index 7aff1e0..3446b28 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/config/InfluxDbOptions.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/config/options/InfluxDbOptions.scala
@@ -1,12 +1,13 @@
-package ru.maizy.ambient7.core.config
-
-import ru.maizy.influxdbclient.InfluxDbConnectionSettings
+package ru.maizy.ambient7.core.config.options
/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2016-2017
* See LICENSE.txt for details.
*/
+import ru.maizy.ambient7.core.config.Defaults
+import ru.maizy.influxdbclient.InfluxDbConnectionSettings
+
case class InfluxDbOptions(
database: Option[String] = None,
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/config/Mt8057AgentSpecificOptions.scala b/core/src/main/scala/ru/maizy/ambient7/core/config/options/Mt8057AgentSpecificOptions.scala
similarity index 93%
rename from core/src/main/scala/ru/maizy/ambient7/core/config/Mt8057AgentSpecificOptions.scala
rename to core/src/main/scala/ru/maizy/ambient7/core/config/options/Mt8057AgentSpecificOptions.scala
index 2e592e2..8476250 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/config/Mt8057AgentSpecificOptions.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/config/options/Mt8057AgentSpecificOptions.scala
@@ -1,12 +1,12 @@
-package ru.maizy.ambient7.core.config
-
-import java.io.File
+package ru.maizy.ambient7.core.config.options
/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/
+import java.io.File
+
// TODO: should be only in mt8057-agent submodule
trait EnumerationMap extends Enumeration {
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/config/WebAppSpecificOptions.scala b/core/src/main/scala/ru/maizy/ambient7/core/config/options/WebAppSpecificOptions.scala
similarity index 80%
rename from core/src/main/scala/ru/maizy/ambient7/core/config/WebAppSpecificOptions.scala
rename to core/src/main/scala/ru/maizy/ambient7/core/config/options/WebAppSpecificOptions.scala
index 593b165..1936151 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/config/WebAppSpecificOptions.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/config/options/WebAppSpecificOptions.scala
@@ -1,4 +1,4 @@
-package ru.maizy.ambient7.core.config
+package ru.maizy.ambient7.core.config.options
/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/config/reader/DevicesConfigReader.scala b/core/src/main/scala/ru/maizy/ambient7/core/config/reader/DevicesConfigReader.scala
index 28d44d1..873ba86 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/config/reader/DevicesConfigReader.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/config/reader/DevicesConfigReader.scala
@@ -1,15 +1,15 @@
package ru.maizy.ambient7.core.config.reader
-import scala.annotation.tailrec
-import com.typesafe.config.Config
-import ru.maizy.ambient7.core.config.{ Ambient7Options, Defaults, FromCliDevice, ParsingError }
-import ru.maizy.ambient7.core.data.{ AgentTags, Co2Agent, Co2Device, Device, DeviceType, Devices }
-
/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/
+import scala.annotation.tailrec
+import com.typesafe.config.Config
+import ru.maizy.ambient7.core.config.{ Ambient7Options, Defaults, FromCliDevice, ParsingError }
+import ru.maizy.ambient7.core.data.{ AgentTags, Co2Agent, Co2Device, Device, DeviceType, Devices }
+
object DevicesConfigReader {
private case class DeviceConfigModel(
@@ -21,27 +21,11 @@ object DevicesConfigReader {
}
-
trait DevicesConfigReader extends UniversalConfigReader {
import UniversalConfigReader._
import DevicesConfigReader._
-
- private def cliDeviceOpts(
- opts: Ambient7Options, deviceType: DeviceType.Type)(save: FromCliDevice => FromCliDevice): Ambient7Options = {
-
- val currentOpts = opts.fromCliDevice.getOrElse(FromCliDevice(deviceType = Some(deviceType)))
- opts.copy(
- fromCliDevice = Some(save(currentOpts))
- )
- }
-
- private def devicesOpts(opts: Ambient7Options)(save: Devices => Devices): Ambient7Options = {
- val currentDevices = opts.devices.getOrElse(Devices())
- opts.copy(
- devices = Some(save(currentDevices))
- )
- }
+ import configs.syntax._
def fillDeviceFromCliOptions(deviceType: DeviceType.Type): Unit = {
@@ -75,31 +59,7 @@ trait DevicesConfigReader extends UniversalConfigReader {
()
}
- // TODO: brake logic by device types, ex. separate object for every type
- private def extractDevice(rawConfig: DeviceConfigModel): Either[Seq[String], Device] = {
- // mt8057 the only known device type for now
- if (rawConfig.model != "mt8057") {
- Left(List(s"unknown devices type '${rawConfig.model}' for device with id = '${rawConfig.id}'"))
-
- // tags defined, but format violated
- } else if (rawConfig.agentTags.exists(AgentTags.tryParseFromString(_).isLeft)) {
- Left(List(
- s"bad devices tags format '${rawConfig.agentTags.getOrElse("")}' for device with id = '${rawConfig.id}'"
- ))
-
- } else {
- Right(
- Co2Device(
- rawConfig.id,
- agent = Co2Agent(rawConfig.agentName, AgentTags(rawConfig.agentTags.getOrElse("")))
- )
- )
- }
- }
-
def fillDevicesOptions(): Unit = {
- import configs.syntax._
-
appendConfigRule { (config, opts) =>
config.get[Option[List[Config]]]("devices").toEither match {
case Left(configError) => Left(ParsingError.withMessages(configError.messages))
@@ -108,17 +68,17 @@ trait DevicesConfigReader extends UniversalConfigReader {
@tailrec
def iter(
- acc: Either[Seq[String], Seq[Device]], devicesConfigs: List[Config]): Either[Seq[String], Seq[Device]] =
+ acc: Either[Seq[String], List[Device]], devicesConfigs: List[Config]): Either[Seq[String], List[Device]] =
{
devicesConfigs match {
case Nil => acc
case deviceConfig :: xs =>
val current = deviceConfig.extract[DeviceConfigModel] match {
- case configs.Result.Success(rawDevice) =>
- extractDevice(rawDevice) match {
+ case configs.Result.Success(deviceConfigModel) =>
+ extractDevice(deviceConfigModel, deviceConfig) match {
// don't append devices if there was error on previous steps
case Right(device) if acc.isRight =>
- Right(acc.right.toOption.getOrElse(Seq.empty) :+ device)
+ Right(acc.right.toOption.getOrElse(List.empty) :+ device)
// always replace previous success with errors
case Left(errors) =>
Left(acc.left.toOption.getOrElse(Seq.empty) ++: errors)
@@ -126,7 +86,7 @@ trait DevicesConfigReader extends UniversalConfigReader {
case _ => acc
}
case configs.Result.Failure(error) =>
- Left(acc.left.toOption.getOrElse(List.empty) ++: error.messages)
+ Left(acc.left.toOption.getOrElse(Seq.empty) ++: error.messages)
}
iter(current, xs)
}
@@ -140,7 +100,7 @@ trait DevicesConfigReader extends UniversalConfigReader {
} else {
Right(devicesOpts(opts) { devicesOpts =>
val co2Devices = devices.collect { case device: Co2Device => device }
- devicesOpts.copy(co2Devices = co2Devices.toList)
+ devicesOpts.copy(co2Devices = co2Devices)
})
}
@@ -148,7 +108,6 @@ trait DevicesConfigReader extends UniversalConfigReader {
}
}
}
-
()
}
@@ -177,4 +136,45 @@ trait DevicesConfigReader extends UniversalConfigReader {
}
}
}
+
+ private def cliDeviceOpts(
+ opts: Ambient7Options, deviceType: DeviceType.Type)(save: FromCliDevice => FromCliDevice): Ambient7Options =
+ {
+
+ val currentOpts = opts.fromCliDevice.getOrElse(FromCliDevice(deviceType = Some(deviceType)))
+ opts.copy(
+ fromCliDevice = Some(save(currentOpts))
+ )
+ }
+
+ private def devicesOpts(opts: Ambient7Options)(save: Devices => Devices): Ambient7Options = {
+ val currentDevices = opts.devices.getOrElse(Devices())
+ opts.copy(
+ devices = Some(save(currentDevices))
+ )
+ }
+
+ // TODO: brake logic by device types, ex. separate object for every type
+ private def extractDevice(
+ deviceConfigModel: DeviceConfigModel, rawConfig: Config): Either[Seq[String], Device] =
+ {
+ // mt8057 the only known device type for now
+ if (deviceConfigModel.model != "mt8057") {
+ Left(List(s"unknown devices type '${deviceConfigModel.model}' for device with id = '${deviceConfigModel.id}'"))
+
+ // tags defined, but format violated
+ } else if (deviceConfigModel.agentTags.exists(AgentTags.tryParseFromString(_).isLeft)) {
+ val tags = deviceConfigModel.agentTags.getOrElse("")
+ Left(List(
+ s"bad devices tags format '${tags}' for device with id = '${deviceConfigModel.id}'"
+ ))
+
+ } else {
+ Right(Co2Device(
+ deviceConfigModel.id,
+ agent = Co2Agent(deviceConfigModel.agentName, AgentTags(deviceConfigModel.agentTags.getOrElse("")))
+ ))
+ }
+ }
+
}
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/config/reader/InfluxDbConfigReader.scala b/core/src/main/scala/ru/maizy/ambient7/core/config/reader/InfluxDbConfigReader.scala
index 5074b2f..c248449 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/config/reader/InfluxDbConfigReader.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/config/reader/InfluxDbConfigReader.scala
@@ -5,7 +5,8 @@ package ru.maizy.ambient7.core.config.reader
* See LICENSE.txt for details.
*/
-import ru.maizy.ambient7.core.config.{ Ambient7Options, Defaults, InfluxDbOptions }
+import ru.maizy.ambient7.core.config.options.InfluxDbOptions
+import ru.maizy.ambient7.core.config.{ Ambient7Options, Defaults }
trait InfluxDbConfigReader extends UniversalConfigReader {
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/config/reader/MainDbConfigReader.scala b/core/src/main/scala/ru/maizy/ambient7/core/config/reader/MainDbConfigReader.scala
index 4b8bdb6..afc69b9 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/config/reader/MainDbConfigReader.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/config/reader/MainDbConfigReader.scala
@@ -5,7 +5,8 @@ package ru.maizy.ambient7.core.config.reader
* See LICENSE.txt for details.
*/
-import ru.maizy.ambient7.core.config.{ Ambient7Options, DbOptions, Defaults }
+import ru.maizy.ambient7.core.config.options.DbOptions
+import ru.maizy.ambient7.core.config.{ Ambient7Options, Defaults }
trait MainDbConfigReader extends UniversalConfigReader {
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/config/reader/UniversalConfigReader.scala b/core/src/main/scala/ru/maizy/ambient7/core/config/reader/UniversalConfigReader.scala
index 6c786f5..6f5e66b 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/config/reader/UniversalConfigReader.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/config/reader/UniversalConfigReader.scala
@@ -259,6 +259,14 @@ trait UniversalConfigReader {
}
}
+ implicit class ErrorOrFlatmap[T, V](result: configs.Result[T]) {
+ def errorOrFlatmap(map: T => Either[Seq[String], V]): Either[Seq[String], V] = {
+ result.toEither match {
+ case Right(r) => map(r)
+ case Left(errors) => Left(errors.messages)
+ }
+ }
+ }
}
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/util/DateTimeIterator.scala b/core/src/main/scala/ru/maizy/ambient7/core/util/DateTimeIterator.scala
index 838ee0d..7fa1027 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/util/DateTimeIterator.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/util/DateTimeIterator.scala
@@ -7,6 +7,7 @@ package ru.maizy.ambient7.core.util
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
+import scala.concurrent.duration.Duration
// TODO: generalize for any date order past->future, future->past
class DateTimeIterator private (from: ZonedDateTime, to: ZonedDateTime, step: Long, stepUnit: ChronoUnit)
@@ -26,4 +27,9 @@ object DateTimeIterator {
def apply(from: ZonedDateTime, to: ZonedDateTime, step: Long, stepUnit: ChronoUnit): DateTimeIterator = {
new DateTimeIterator(from, to, step, stepUnit)
}
+
+ def apply(from: ZonedDateTime, to: ZonedDateTime, stepDuration: Duration): DateTimeIterator = {
+ require(stepDuration.isFinite)
+ new DateTimeIterator(from, to, stepDuration.toMicros, ChronoUnit.MICROS)
+ }
}
diff --git a/core/src/main/scala/ru/maizy/ambient7/core/util/Dates.scala b/core/src/main/scala/ru/maizy/ambient7/core/util/Dates.scala
index 81caded..9fa5924 100644
--- a/core/src/main/scala/ru/maizy/ambient7/core/util/Dates.scala
+++ b/core/src/main/scala/ru/maizy/ambient7/core/util/Dates.scala
@@ -5,9 +5,22 @@ package ru.maizy.ambient7.core.util
* See LICENSE.txt for details.
*/
-import java.time.{ ZoneId, ZonedDateTime }
+import java.time.{ Instant, ZoneId, ZonedDateTime }
+import scala.concurrent.duration.Duration
object Dates {
def dateTimeForUser(dateTime: ZonedDateTime, timeZone: ZoneId = ZoneId.systemDefault()): String =
dateTime.withZoneSameInstant(timeZone).toString
+
+ def truncateDateTime(dateTime: ZonedDateTime, step: Long): ZonedDateTime = {
+ val epochSeconds = dateTime.toEpochSecond
+ val mod = epochSeconds % step
+ val instant = Instant.ofEpochSecond(epochSeconds - mod)
+ ZonedDateTime.ofInstant(instant, dateTime.getZone)
+ }
+
+ def truncateDateTime(dateTime: ZonedDateTime, step: Duration): ZonedDateTime = {
+ require(step.isFinite)
+ truncateDateTime(dateTime, step.toSeconds)
+ }
}
diff --git a/core/src/test/scala/ru/maizy/ambient7/core/tests/config/reader/SampleReaders.scala b/core/src/test/scala/ru/maizy/ambient7/core/tests/config/reader/SampleReaders.scala
index d8d7534..289aa67 100644
--- a/core/src/test/scala/ru/maizy/ambient7/core/tests/config/reader/SampleReaders.scala
+++ b/core/src/test/scala/ru/maizy/ambient7/core/tests/config/reader/SampleReaders.scala
@@ -5,7 +5,7 @@ package ru.maizy.ambient7.core.tests.config.reader
* See LICENSE.txt for details.
*/
-import ru.maizy.ambient7.core.config.DbOptions
+import ru.maizy.ambient7.core.config.options.DbOptions
import ru.maizy.ambient7.core.config.reader.UniversalConfigReader
trait SampleReaders {
diff --git a/core/src/test/scala/ru/maizy/ambient7/core/tests/config/reader/UniversalConfigReaderSpec.scala b/core/src/test/scala/ru/maizy/ambient7/core/tests/config/reader/UniversalConfigReaderSpec.scala
index 2e40608..e4e5c15 100644
--- a/core/src/test/scala/ru/maizy/ambient7/core/tests/config/reader/UniversalConfigReaderSpec.scala
+++ b/core/src/test/scala/ru/maizy/ambient7/core/tests/config/reader/UniversalConfigReaderSpec.scala
@@ -6,8 +6,9 @@ package ru.maizy.ambient7.core.tests.config.reader
*/
import java.nio.file.Paths
+import ru.maizy.ambient7.core.config.options.DbOptions
import ru.maizy.ambient7.core.config.reader.UniversalConfigReader
-import ru.maizy.ambient7.core.config.{ Ambient7Options, DbOptions, ParsingError }
+import ru.maizy.ambient7.core.config.{ Ambient7Options, ParsingError }
import ru.maizy.ambient7.core.tests.BaseSpec
class UniversalConfigReaderSpec extends BaseSpec with SampleReaders {
diff --git a/core/src/test/scala/ru/maizy/ambient7/core/tests/util/DatesSpec.scala b/core/src/test/scala/ru/maizy/ambient7/core/tests/util/DatesSpec.scala
new file mode 100644
index 0000000..7106881
--- /dev/null
+++ b/core/src/test/scala/ru/maizy/ambient7/core/tests/util/DatesSpec.scala
@@ -0,0 +1,33 @@
+package ru.maizy.ambient7.core.tests.util
+
+/**
+ * Copyright (c) Nikita Kovaliov, maizy.ru, 2017
+ * See LICENSE.txt for details.
+ */
+
+import scala.concurrent.duration.DurationInt
+import java.time.{ Instant, ZoneId, ZoneOffset, ZonedDateTime }
+import ru.maizy.ambient7.core.tests.BaseSpec
+import ru.maizy.ambient7.core.util.Dates
+
+
+class DatesSpec extends BaseSpec {
+
+ // timestamp = 1483272917
+ val time = ZonedDateTime.of(2017, 1, 1, 12, 15, 17, 115, ZoneOffset.UTC)
+
+ "Dates.truncateDateTime" should "works" in {
+ Dates.truncateDateTime(time, 10.seconds) shouldBe
+ ZonedDateTime.of(2017, 1, 1, 12, 15, 10, 0, ZoneOffset.UTC)
+ }
+
+ it should "works with long offsets" in {
+ Dates.truncateDateTime(time, 1000.seconds) shouldBe
+ ZonedDateTime.ofInstant(Instant.ofEpochSecond(1483272000L), ZoneOffset.UTC)
+ }
+
+ it should "saves zone information" in {
+ val zone = ZoneId.systemDefault
+ Dates.truncateDateTime(time.withZoneSameInstant(zone), 10.seconds).getZone shouldBe zone
+ }
+}
diff --git a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/Error.scala b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/Error.scala
new file mode 100644
index 0000000..f958dd9
--- /dev/null
+++ b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/Error.scala
@@ -0,0 +1,8 @@
+package ru.maizy.influxdbclient
+
+/**
+ * Copyright (c) Nikita Kovaliov, maizy.ru, 2017
+ * See LICENSE.txt for details.
+ */
+
+class Error(message: String, cause: Option[Throwable] = None) extends Exception(message, cause.orNull)
diff --git a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/InfluxDbClient.scala b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/InfluxDbClient.scala
index 9a83670..0765950 100644
--- a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/InfluxDbClient.scala
+++ b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/InfluxDbClient.scala
@@ -6,14 +6,18 @@ package ru.maizy.influxdbclient
*/
import java.net.URLEncoder
+import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.{ DurationInt, FiniteDuration }
+import scala.collection.JavaConverters.{ collectionAsScalaIterableConverter, mapAsScalaMapConverter }
import scala.util.{ Failure, Success, Try }
import scalaj.http.{ BaseHttp, HttpOptions, HttpResponse }
import com.typesafe.scalalogging.LazyLogging
+import dispatch.{ Http, Req, as, implyRequestHandlerTuple, url }
import ru.maizy.influxdbclient.data.{ ErrorDto, QueryResult }
import ru.maizy.influxdbclient.responses.QueryResultsProtocol._
import spray.json.{ DeserializationException, JsonParser, ParserInput }
+// TODO: remove sync methods (iss #39)
class InfluxDbClient(
val influxDbSettings: InfluxDbConnectionSettings,
val userAgent: Option[String] = None,
@@ -37,9 +41,44 @@ class InfluxDbClient(
)
)
- def rawDataQuery(query: String): Either[ErrorDto, HttpResponse[Array[Byte]]] = {
- logger.debug(s"db: ${influxDbReadonlySettings.db}, query: $query")
- val request = buildBaseRequest("query", influxDbReadonlySettings)
+
+
+ def rawDataQuery(query: String, readOnly: Boolean = true)(implicit ec: ExecutionContext): Future[RawHttpResponse] = {
+ val settings = if (readOnly) influxDbReadonlySettings else influxDbSettings
+ logQuery(settings, query)
+ val request = buildBaseRequest("query", settings)
+ .addQueryParameter("db", settings.db)
+ .addQueryParameter("q", query)
+
+ Http(request > as.Response { response =>
+ val headers = response.getHeaders.asScala.mapValues(_.asScala.toIndexedSeq).toMap
+ RawHttpResponse(response.getStatusCode, response.getResponseBodyAsBytes, headers)
+ })
+ }
+
+ def query(query: String, readOnly: Boolean = true)(implicit ec: ExecutionContext): Future[QueryResult] = {
+ rawDataQuery(query, readOnly) map { raw =>
+ parseQueryResult(raw) match {
+ case Right(r) => r
+ case Left(error) => throw new Error(error.message, error.cause)
+ }
+
+ }
+ }
+
+ // TODO: remove ErrorDto use Error instead
+ private def parseQueryResult(response: RawHttpResponse): Either[ErrorDto, QueryResult] = {
+ if (response.code != 200) {
+ Left(parserErrorResponse(response))
+ } else {
+ parserQueryResponse(response)
+ }
+ }
+
+ @deprecated("use async methods", "0.4")
+ def syncRawDataQuery(query: String): Either[ErrorDto, HttpResponse[Array[Byte]]] = {
+ logQuery(influxDbReadonlySettings, query)
+ val request = buildBaseSyncRequest("query", influxDbReadonlySettings)
.param("db", influxDbReadonlySettings.db)
.param("q", query)
@@ -49,17 +88,15 @@ class InfluxDbClient(
}
}
- def query(query: String): Either[ErrorDto, QueryResult] = {
- rawDataQuery(query).right.flatMap { response =>
- if (response.code != 200) {
- Left(parserErrorResponse(response))
- } else {
- parserQueryResponse(response)
- }
+ @deprecated("use async methods", "0.4")
+ def syncQuery(query: String): Either[ErrorDto, QueryResult] = {
+ syncRawDataQuery(query).right.flatMap { response =>
+ parseQueryResult(RawHttpResponse(response.code, response.body, response.headers))
}
}
- private def parserErrorResponse(response: HttpResponse[Array[Byte]]): ErrorDto =
+ // TODO: remove ErrorDto use Error instead
+ private def parserErrorResponse(response: RawHttpResponse): ErrorDto =
Try(JsonParser(ParserInput(response.body)).convertTo[ErrorDto]) match {
case Success(error) => error
case Failure(e: DeserializationException) =>
@@ -68,7 +105,8 @@ class InfluxDbClient(
ErrorDto(s"unknown response deserialization error (response code ${response.code})", Some(e))
}
- private def parserQueryResponse(response: HttpResponse[Array[Byte]]): Either[ErrorDto, QueryResult] =
+ // TODO: remove ErrorDto use Error instead
+ private def parserQueryResponse(response: RawHttpResponse): Either[ErrorDto, QueryResult] =
Try(JsonParser(ParserInput(response.body)).convertTo[QueryResult]) match {
case Success(result) =>
Right(result)
@@ -78,7 +116,7 @@ class InfluxDbClient(
Left(ErrorDto("unknown response deserialization error", Some(e)))
}
- private def buildBaseRequest(method: String, settings: InfluxDbConnectionSettings) = {
+ private def buildBaseSyncRequest(method: String, settings: InfluxDbConnectionSettings) = {
val request = httpClient.apply(buildUri(method, settings))
(settings.user, settings.password) match {
case (Some(user), Some(password)) => request.auth(user, password)
@@ -86,8 +124,25 @@ class InfluxDbClient(
}
}
+ private def buildBaseRequest(method: String, settings: InfluxDbConnectionSettings): Req = {
+ val request = url(buildUri(method, settings))
+ (settings.user, settings.password) match {
+ case (Some(user), Some(password)) => request.as(user, password)
+ case _ => request
+ }
+ }
+
+ // TODO: use dispatch.url constructor instead of a string
private def buildUri(method: String, settings: InfluxDbConnectionSettings): String =
settings.baseUrl.stripSuffix("/") + "/" + urlencode(method)
private def urlencode(value: String) = URLEncoder.encode(value, "UTF-8")
+
+ private def logQuery(settings: InfluxDbConnectionSettings, query: String): Unit = {
+ logger.debug(
+ settings.user.map( _ + "@").getOrElse("") +
+ settings.baseUrl.stripSuffix("/") +
+ s"/${settings.db}: $query"
+ )
+ }
}
diff --git a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/RawHttpResponse.scala b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/RawHttpResponse.scala
new file mode 100644
index 0000000..cf25620
--- /dev/null
+++ b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/RawHttpResponse.scala
@@ -0,0 +1,8 @@
+package ru.maizy.influxdbclient
+
+/**
+ * Copyright (c) Nikita Kovaliov, maizy.ru, 2017
+ * See LICENSE.txt for details.
+ */
+
+case class RawHttpResponse(code: Int, body: Array[Byte], headers: Map[String, IndexedSeq[String]])
diff --git a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/data/ErrorDto.scala b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/data/ErrorDto.scala
index 51b5389..7f04f77 100644
--- a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/data/ErrorDto.scala
+++ b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/data/ErrorDto.scala
@@ -4,4 +4,5 @@ package ru.maizy.influxdbclient.data
* Copyright (c) Nikita Kovaliov, maizy.ru, 2016-2017
* See LICENSE.txt for details.
*/
+@deprecated("use Throwable", "0.4")
case class ErrorDto(message: String, cause: Option[Throwable] = None)
diff --git a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Dates.scala b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Dates.scala
index 0ca0db3..ecc84b8 100644
--- a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Dates.scala
+++ b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Dates.scala
@@ -13,11 +13,12 @@ object Dates {
val INFLUXDB_TIMEZONE = ZoneOffset.UTC
val INFLUXDB_DATETIME_FORMAT = DateTimeFormatter.ISO_INSTANT
+ private val SYSTEM_TIMEZONE = ZoneId.systemDefault()
def toInfluxDbFormat(date: ZonedDateTime): String =
date.withZoneSameInstant(INFLUXDB_TIMEZONE).format(INFLUXDB_DATETIME_FORMAT)
- def fromInfluxDbToZonedDateTime(dateTime: String, timeZone: ZoneId = ZoneId.systemDefault()): Try[ZonedDateTime] =
+ def fromInfluxDbToZonedDateTime(dateTime: String, timeZone: ZoneId = Dates.SYSTEM_TIMEZONE): Try[ZonedDateTime] =
Try(INFLUXDB_DATETIME_FORMAT.parse(dateTime))
.map(Instant.from(_).atZone(INFLUXDB_TIMEZONE).withZoneSameInstant(timeZone))
}
diff --git a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Escape.scala b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Escape.scala
index d3d03b8..4b1568d 100644
--- a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Escape.scala
+++ b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Escape.scala
@@ -6,21 +6,24 @@ package ru.maizy.influxdbclient.util
*/
object Escape {
- def encodeKey(key: String): String = key.replace("\"", "\\\"") // " -> \"
+ def encodeIdentifier(key: String): String = key.replace("\"", "\\\"") // " -> \"
- def escapeKey(key: String): String = "\"" + encodeKey(key) + "\""
+ def escapeIdentifier(key: String): String = "\"" + encodeIdentifier(key) + "\""
+ def decodeIdentifier(value: String): String = value.replace("\\\"", "\"")
- def encodeTag(key: String): String = key.replace(",", "\\,")
-
- def decodeTag(key: String): String = key.replace("\\,", ",")
-
- // TODO: build right replacement based on spec
def encodeValue(value: String): String = value.replace("'", "\\'")
+ def decodeValue(value: String): String = value.replace("\\'", "'")
+
def escapeValue(value: String): String = "'" + encodeValue(value) + "'"
+
+ def encodeTag(key: String): String = key.replace(",", "\\,")
+
+ def decodeTag(key: String): String = key.replace("\\,", ",")
+
def tagsToQueryCondition(tags: IndexedSeq[(String, String)]): String = {
if (tags.isEmpty) {
"true"
@@ -28,7 +31,7 @@ object Escape {
tags
.map { pair =>
val (name, value) = pair
- encodeKey(name) + " = " + encodeValue(value)
+ encodeIdentifier(name) + " = " + encodeValue(value)
}
.mkString(" and ")
}
diff --git a/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/InfluxDbWriter.scala b/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/InfluxDbWriter.scala
index 682529c..c818e2a 100644
--- a/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/InfluxDbWriter.scala
+++ b/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/InfluxDbWriter.scala
@@ -8,7 +8,8 @@ package ru.maizy.ambient7.mt8057agent
import scala.util.{ Failure, Success, Try }
import scalaj.http.{ BaseHttp, HttpOptions, HttpRequest, HttpResponse }
import com.typesafe.scalalogging.LazyLogging
-import ru.maizy.ambient7.core.config.{ Ambient7Options, InfluxDbOptions }
+import ru.maizy.ambient7.core.config.Ambient7Options
+import ru.maizy.ambient7.core.config.options.InfluxDbOptions
import ru.maizy.ambient7.core.data.{ AgentTag, AgentTags }
class InfluxDbWriter(opts: Ambient7Options) extends Writer with LazyLogging {
diff --git a/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/MessageDecoder.scala b/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/MessageDecoder.scala
index aeb0319..474dcbc 100644
--- a/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/MessageDecoder.scala
+++ b/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/MessageDecoder.scala
@@ -13,6 +13,7 @@ import scala.util.{ Success, Failure, Try }
sealed trait ResultValue
+// TODO: use float
case class Temp(celsius: Double) extends ResultValue
case class Co2(ppm: Int, high: Boolean = false) extends ResultValue
diff --git a/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/Mt8057AgentAppLauncher.scala b/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/Mt8057AgentAppLauncher.scala
index afb5c17..5df8986 100644
--- a/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/Mt8057AgentAppLauncher.scala
+++ b/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/Mt8057AgentAppLauncher.scala
@@ -7,7 +7,8 @@ import ch.qos.logback.classic.{ Level, LoggerContext, Logger => LogbackLogger }
import ch.qos.logback.core.{ ConsoleAppender, FileAppender, OutputStreamAppender }
import com.google.common.collect.{ EvictingQueue, Queues }
import org.slf4j.LoggerFactory
-import ru.maizy.ambient7.core.config.{ Ambient7Options, ParsingError, Writers }
+import ru.maizy.ambient7.core.config.options.Writers
+import ru.maizy.ambient7.core.config.{ Ambient7Options, ParsingError }
/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
diff --git a/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/Mt8057ConfigReader.scala b/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/Mt8057ConfigReader.scala
index 360710d..97cb3e3 100644
--- a/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/Mt8057ConfigReader.scala
+++ b/mt8057-agent/src/main/scala/ru/maizy/ambient7/mt8057agent/Mt8057ConfigReader.scala
@@ -6,7 +6,8 @@ package ru.maizy.ambient7.mt8057agent
*/
import java.io.File
-import ru.maizy.ambient7.core.config.{ Ambient7Options, EnumerationMap, Mt8057AgentSpecificOptions, Writers }
+import ru.maizy.ambient7.core.config.options.{ EnumerationMap, Mt8057AgentSpecificOptions, Writers }
+import ru.maizy.ambient7.core.config.Ambient7Options
import ru.maizy.ambient7.core.config.reader.{ DevicesConfigReader, InfluxDbConfigReader, UniversalConfigReader }
import ru.maizy.ambient7.core.data.DeviceType
diff --git a/mt8057-agent/src/test/scala/ru/maizy/ambient7/mt8057agent/tests/InfluxDbWriterSpec.scala b/mt8057-agent/src/test/scala/ru/maizy/ambient7/mt8057agent/tests/InfluxDbWriterSpec.scala
index 33b710d..c60cf3f 100644
--- a/mt8057-agent/src/test/scala/ru/maizy/ambient7/mt8057agent/tests/InfluxDbWriterSpec.scala
+++ b/mt8057-agent/src/test/scala/ru/maizy/ambient7/mt8057agent/tests/InfluxDbWriterSpec.scala
@@ -5,7 +5,8 @@ package ru.maizy.ambient7.mt8057agent.tests
* See LICENSE.txt for details.
*/
-import ru.maizy.ambient7.core.config.{ Ambient7Options, InfluxDbOptions }
+import ru.maizy.ambient7.core.config.Ambient7Options
+import ru.maizy.ambient7.core.config.options.InfluxDbOptions
import ru.maizy.ambient7.core.data.{ AgentTags, Co2Agent, Co2Device, Devices }
import ru.maizy.ambient7.mt8057agent._
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 474d8cd..832ec1e 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -42,7 +42,7 @@
-
+