From f85410632754c6fd05dcd0f30aa38c0938d8a5d1 Mon Sep 17 00:00:00 2001 From: Nikita Kovaliov Date: Wed, 5 Apr 2017 19:41:53 +0300 Subject: [PATCH] iss #26: [wip] device data updater --- .../notifications/NotificationsExecutor.scala | 44 ++++++++---- .../analysis/notifications/data/Co2Data.scala | 59 ++++++++++++++++ .../analysis/notifications/data/Data.scala | 26 +------ .../analysis/service/AggregateType.scala | 26 +++++++ .../analysis/service/InfluxDbCo2Service.scala | 68 ++++++++++++++++++- .../test/notifications/data/DataSpec.scala | 10 +-- .../config/reader/DevicesConfigReader.scala | 2 +- .../ambient7/core/util/DateTimeIterator.scala | 6 ++ .../maizy/influxdbclient/InfluxDbClient.scala | 9 ++- .../ru/maizy/influxdbclient/util/Dates.scala | 3 +- 10 files changed, 205 insertions(+), 48 deletions(-) create mode 100644 ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/data/Co2Data.scala create mode 100644 ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/AggregateType.scala diff --git a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/NotificationsExecutor.scala b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/NotificationsExecutor.scala index aaf4c4e..a8a8a49 100644 --- a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/NotificationsExecutor.scala +++ b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/NotificationsExecutor.scala @@ -11,20 +11,24 @@ import scala.concurrent.duration.DurationInt import scala.util.Success import ru.maizy.ambient7.core.config.Ambient7Options import ru.maizy.ambient7.analysis.influxdb -import ru.maizy.ambient7.analysis.notifications.data.Data +import ru.maizy.ambient7.analysis.notifications.data.{ Co2Data, Data } +import ru.maizy.ambient7.core.data.Device class NotificationsExecutor(opts: Ambient7Options) { + private case class WatchedDevice(device: Device, data: Data) + private val logger = notificationsLogger - var data: Option[Data] = None - val finishPromise: Option[Future[Unit]] = None + private var watchedDevices: List[WatchedDevice] = List.empty + private var finishPromise: Option[Promise[Unit]] = None private implicit val notificationContext = ExecutionContext.global def start(): Future[Unit] = { - val finishPromise = Promise[Unit]() + val promise = Promise[Unit]() + finishPromise = Some(promise) def onFailureProxyToPromise[T](future: Future[T]): Future[T] = { - future onFailure { case error: Throwable => finishPromise.failure(error) } + future onFailure { case error: Throwable => promise.failure(error) } future } @@ -32,7 +36,7 @@ class NotificationsExecutor(opts: Ambient7Options) { onFailureProxyToPromise(sheduleDataUpdate()) } - finishPromise.future + promise.future } private def init(): Future[Unit] = { @@ -43,8 +47,12 @@ class NotificationsExecutor(opts: Ambient7Options) { // TODO: init watchers // FIXME: tmp val maxRequestedDuration = 20.minutes - - data = Some(Data(influxDbClient, refreshRate, maxRequestedDuration)) + val device = opts.co2Devices.head + watchedDevices = + WatchedDevice( + device, + new Co2Data(device.agent, influxDbClient, refreshRate, maxRequestedDuration) + ) :: watchedDevices Future.successful(()) case _ => Future.failed(new Error("Some options required for notifications not provided")) } @@ -52,14 +60,14 @@ class NotificationsExecutor(opts: Ambient7Options) { // FIXME: add scheduler based on ScheduledExecutorService, current solution blocks one of EC thread private def sheduleDataUpdate(step: Long = 0): Future[Unit] = { - (opts.notifications.map(_.refreshRate), data) match { + opts.notifications.map(_.refreshRate) match { - case (Some(refreshRate), Some(dataContainer)) => + case Some(refreshRate) => val start = ZonedDateTime.now() val nextUpdate = start.plusSeconds(refreshRate.toSeconds) logger.debug(s"Updating notification data, step #$step") - val dataUpdateFuture = dataUpdate(dataContainer) + val dataUpdateFuture = updateData() def checkAndScheduleNext(): Unit = { val end = ZonedDateTime.now() @@ -78,7 +86,7 @@ class NotificationsExecutor(opts: Ambient7Options) { dataUpdateFuture onComplete { case Success(_) => // FIXME: tmp, remove $data - logger.debug(s"Notification data has updated successful, step #$step: $data") + logger.debug(s"Notification data has updated successful, step #$step") checkAndScheduleNext() case util.Failure(e) => @@ -88,9 +96,17 @@ class NotificationsExecutor(opts: Ambient7Options) { Future.successful(()) - case _ => Future.failed(new Error("Requirements for date update not defined")) + case _ => Future.failed(new Error("Requirements for date update isn't defined")) } } - private def dataUpdate(dataContainer: Data): Future[Unit] = dataContainer.update() + private def updateData(): Future[Unit] = { + Future + .sequence(watchedDevices.map(_.data.update())) + .map { _ => + // FIXME tmp + println(watchedDevices.map(_.data)) + () + } + } } diff --git a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/data/Co2Data.scala b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/data/Co2Data.scala new file mode 100644 index 0000000..12a5c53 --- /dev/null +++ b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/data/Co2Data.scala @@ -0,0 +1,59 @@ +package ru.maizy.ambient7.analysis.notifications.data + +/** + * Copyright (c) Nikita Kovaliov, maizy.ru, 2017 + * See LICENSE.txt for details. + */ + +import java.time.temporal.ChronoUnit +import java.time.{ ZoneId, ZonedDateTime } +import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.duration.Duration +import scala.concurrent.duration.DurationInt +import ru.maizy.ambient7.analysis.service.InfluxDbCo2Service +import ru.maizy.ambient7.core.data.Co2Agent +import ru.maizy.influxdbclient.InfluxDbClient + +class Co2Data( + val agent: Co2Agent, + influxDbClient: InfluxDbClient, + val refreshRate: Duration, + val storeDuration: Duration) extends Data +{ + + private val limit = Co2Data.computeLimit(storeDuration, refreshRate) + val co2 = new DataPoints[Option[Int]](limit) + val temp = new DataPoints[Option[Float]](limit) + val availability = new DataPoints[Boolean](limit) + + def update(): Future[Unit] = { + implicit val ec = ExecutionContext.Implicits.global + + // FIXME tmp + val now = ZonedDateTime.of(2017, 2, 12, 12, 0, 0, 0, ZoneId.systemDefault()) + val from = now.minus(storeDuration.toMillis, ChronoUnit.MILLIS) + + InfluxDbCo2Service.getValuesForTimePeriod( + influxDbClient, + agent, + from, + now, + segment = 5.seconds min refreshRate + ) map { res => + res.foreach { level => + co2.appendPoint(level.from, level.ppm) + } + () + } + + // TODO: temp + // TODO: availability + + } +} + +object Co2Data { + + def computeLimit(refreshRate: Duration, storeDuration: Duration): Int = + (storeDuration.toSeconds.toDouble / refreshRate.toSeconds).floor.toInt + 1 +} diff --git a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/data/Data.scala b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/data/Data.scala index f38bb19..ab2e4bb 100644 --- a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/data/Data.scala +++ b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/notifications/data/Data.scala @@ -5,29 +5,9 @@ package ru.maizy.ambient7.analysis.notifications.data * See LICENSE.txt for details. */ -import scala.concurrent.{ ExecutionContext, Future } -import scala.concurrent.duration.Duration -import ru.maizy.influxdbclient.InfluxDbClient +import scala.concurrent.Future -class Data private (influxDbClient: InfluxDbClient, val limit: Int) { - val co2 = new DataPoints[Option[Int]](limit) - val temp = new DataPoints[Option[Float]](limit) - val availability = new DataPoints[Boolean](limit) - - def update(): Future[Unit] = { - implicit val ec = ExecutionContext.Implicits.global - influxDbClient.query("select ppm from co2 limit 10").map { res => - println(res) - () - } - } -} - -object Data { - def apply(influxDbClient: InfluxDbClient, refreshRate: Duration, storeDuration: Duration): Data = - new Data(influxDbClient, computeLimit(refreshRate, storeDuration)) - - def computeLimit(refreshRate: Duration, storeDuration: Duration): Int = - (storeDuration.toSeconds.toDouble / refreshRate.toSeconds).floor.toInt + 1 +trait Data { + def update(): Future[Unit] } diff --git a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/AggregateType.scala b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/AggregateType.scala new file mode 100644 index 0000000..bbf82ec --- /dev/null +++ b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/AggregateType.scala @@ -0,0 +1,26 @@ +package ru.maizy.ambient7.analysis.service + +/** + * Copyright (c) Nikita Kovaliov, maizy.ru, 2017 + * See LICENSE.txt for details. + */ + +sealed trait AggregateType { + def code: String + def apply(value: String): String +} + +object MAX extends AggregateType { + override def code: String = "max" + override def apply(value: String): String = s"max($value)" +} + +object MIN extends AggregateType { + override def code: String = "min" + override def apply(value: String): String = s"min($value)" +} + +object MEAN extends AggregateType { + override def code: String = "mean" + override def apply(value: String): String = s"mean($value)" +} diff --git a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/InfluxDbCo2Service.scala b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/InfluxDbCo2Service.scala index bd9ef03..de64c37 100644 --- a/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/InfluxDbCo2Service.scala +++ b/ambient7-analysis/src/main/scala/ru/maizy/ambient7/analysis/service/InfluxDbCo2Service.scala @@ -11,14 +11,24 @@ import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import scala.annotation.tailrec import scala.collection.immutable.ListMap +import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.duration.Duration +import scala.concurrent.duration.DurationInt import scala.util.{ Failure, Success } import com.typesafe.scalalogging.LazyLogging -import ru.maizy.ambient7.core.data.{ Co2AggregatedLevels, Co2Agent } -import ru.maizy.influxdbclient.data.{ NullValue, SeriesItem, StringValue } +import ru.maizy.ambient7.analysis.notifications.Error +import ru.maizy.ambient7.core.data.{ Co2Agent, Co2AggregatedLevels } +import ru.maizy.ambient7.core.util.DateTimeIterator +import ru.maizy.influxdbclient.data.{ NullValue, NumberValue, SeriesItem, StringValue } import ru.maizy.influxdbclient.util.Dates import ru.maizy.influxdbclient.util.Escape.{ escapeValue, tagsToQueryCondition } import ru.maizy.influxdbclient.InfluxDbClient +case class Co2Level(ppm: Option[Int], from: ZonedDateTime, duration: Duration, aggregateType: AggregateType) { + def until: ZonedDateTime = from.plus(duration.toMillis, ChronoUnit.SECONDS) +} + + object InfluxDbCo2Service extends LazyLogging { val CO2_OK = 800 @@ -189,4 +199,58 @@ object InfluxDbCo2Service extends LazyLogging { iter(startDate, ListMap.empty) } + + def getValuesForTimePeriod( + client: InfluxDbClient, + agent: Co2Agent, + from: ZonedDateTime, + until: ZonedDateTime, + segment: Duration = 10.seconds, + aggregate: AggregateType = MAX)(implicit ex: ExecutionContext): Future[List[Co2Level]] = + { + require(from.compareTo(until) < 0) + require(segment.isFinite() && segment.toSeconds >= 1) + + val dateFrom = Dates.toInfluxDbFormat(from) + val dateUntil = Dates.toInfluxDbFormat(until) + + val query = + s"select ${aggregate("ppm")} as ppm " + + "from co2 " + + s"where time >= ${escapeValue(dateFrom)} and time < ${escapeValue(dateUntil)} " + + s"and agent = ${escapeValue(agent.agentName)} and ${tagsToQueryCondition(agent.tags.asPairs)} " + + s"group by time(${segment.toSeconds}s)" + + // little bit shitty code + client.query(query) map { res => + val timeToPpm: Map[ZonedDateTime, Option[Int]] = res.firstSeriesItems.map { seriesItem => + (seriesItem.findColumnIndex("time"), seriesItem.findColumnIndex("ppm")) match { + case (Some(timeIndex), Some(ppmIndex)) => + seriesItem.values.zipWithIndex.map { case (row, rowNum) => + val rawTime = row(timeIndex) + rawTime match { + case timeValue: StringValue => + Dates.fromInfluxDbToZonedDateTime(timeValue.value) match { + case Success(time) => + row(ppmIndex) match { + case ppm: NumberValue => (time, Some(ppm.value.toInt)) + case _ => (time, None) + } + + case Failure(e) => throw new Error(s"unparsed time $rawTime", e) + } + case _=> throw new Error(s"time not found in row #$rowNum") + } + }.toMap + case _ => throw new Error(s"time & ppm columns not found in results") + } + }.getOrElse(Map.empty) + + DateTimeIterator(from, until, segment).toList.map { time => + Co2Level(timeToPpm.get(time).flatten, time, segment, aggregate) + } + + } + + } } diff --git a/ambient7-analysis/src/test/scala/ru/maizy/ambient7/analysis/test/notifications/data/DataSpec.scala b/ambient7-analysis/src/test/scala/ru/maizy/ambient7/analysis/test/notifications/data/DataSpec.scala index eeaaeb4..4309918 100644 --- a/ambient7-analysis/src/test/scala/ru/maizy/ambient7/analysis/test/notifications/data/DataSpec.scala +++ b/ambient7-analysis/src/test/scala/ru/maizy/ambient7/analysis/test/notifications/data/DataSpec.scala @@ -6,16 +6,16 @@ package ru.maizy.ambient7.analysis.test.notifications.data */ import scala.concurrent.duration.DurationInt -import ru.maizy.ambient7.analysis.notifications.data.Data +import ru.maizy.ambient7.analysis.notifications.data.Co2Data import ru.maizy.ambient7.analysis.test.BaseSpec class DataSpec extends BaseSpec { "Data.computeLimit" should "works" in { - Data.computeLimit(refreshRate = 5.seconds, storeDuration = 1.minute ) shouldBe 13 - Data.computeLimit(refreshRate = 5.seconds, storeDuration = 10.seconds ) shouldBe 3 - Data.computeLimit(refreshRate = 5.seconds, storeDuration = 5.seconds ) shouldBe 2 - Data.computeLimit(refreshRate = 5.seconds, storeDuration = 1.seconds ) shouldBe 1 + Co2Data.computeLimit(refreshRate = 5.seconds, storeDuration = 1.minute ) shouldBe 13 + Co2Data.computeLimit(refreshRate = 5.seconds, storeDuration = 10.seconds ) shouldBe 3 + Co2Data.computeLimit(refreshRate = 5.seconds, storeDuration = 5.seconds ) shouldBe 2 + Co2Data.computeLimit(refreshRate = 5.seconds, storeDuration = 1.seconds ) shouldBe 1 } } diff --git a/core/src/main/scala/ru/maizy/ambient7/core/config/reader/DevicesConfigReader.scala b/core/src/main/scala/ru/maizy/ambient7/core/config/reader/DevicesConfigReader.scala index a15e4d7..2e6ba73 100644 --- a/core/src/main/scala/ru/maizy/ambient7/core/config/reader/DevicesConfigReader.scala +++ b/core/src/main/scala/ru/maizy/ambient7/core/config/reader/DevicesConfigReader.scala @@ -278,7 +278,7 @@ trait DevicesConfigReader extends UniversalConfigReader { watcherSpec <- device.watchersSpecs; actionId <- watcherSpec.actionsIds if !availableActionsIds.contains(actionId) - ) yield s"watcher '${watcherSpec.watcherType}' of device '${device.id}' contains unknown action $actionId" + ) yield s"watcher '${watcherSpec.watcherType}' of device '${device.id}' contains unknown action '$actionId'" } } .getOrElse(List.empty) diff --git a/core/src/main/scala/ru/maizy/ambient7/core/util/DateTimeIterator.scala b/core/src/main/scala/ru/maizy/ambient7/core/util/DateTimeIterator.scala index 838ee0d..7fa1027 100644 --- a/core/src/main/scala/ru/maizy/ambient7/core/util/DateTimeIterator.scala +++ b/core/src/main/scala/ru/maizy/ambient7/core/util/DateTimeIterator.scala @@ -7,6 +7,7 @@ package ru.maizy.ambient7.core.util import java.time.ZonedDateTime import java.time.temporal.ChronoUnit +import scala.concurrent.duration.Duration // TODO: generalize for any date order past->future, future->past class DateTimeIterator private (from: ZonedDateTime, to: ZonedDateTime, step: Long, stepUnit: ChronoUnit) @@ -26,4 +27,9 @@ object DateTimeIterator { def apply(from: ZonedDateTime, to: ZonedDateTime, step: Long, stepUnit: ChronoUnit): DateTimeIterator = { new DateTimeIterator(from, to, step, stepUnit) } + + def apply(from: ZonedDateTime, to: ZonedDateTime, stepDuration: Duration): DateTimeIterator = { + require(stepDuration.isFinite) + new DateTimeIterator(from, to, stepDuration.toMicros, ChronoUnit.MICROS) + } } diff --git a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/InfluxDbClient.scala b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/InfluxDbClient.scala index 6778bfb..80125a3 100644 --- a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/InfluxDbClient.scala +++ b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/InfluxDbClient.scala @@ -42,9 +42,10 @@ class InfluxDbClient( ) + def rawDataQuery(query: String, readOnly: Boolean = true)(implicit ec: ExecutionContext): Future[RawHttpResponse] = { val settings = if (readOnly) influxDbReadonlySettings else influxDbSettings - logger.debug(s"${settings.user}@${settings.db}: $query") + logQuery(settings, query) val request = buildBaseRequest("query", settings) .addQueryParameter("db", settings.db) .addQueryParameter("q", query) @@ -76,7 +77,7 @@ class InfluxDbClient( @deprecated("use async methods", "0.4") def syncRawDataQuery(query: String): Either[ErrorDto, HttpResponse[Array[Byte]]] = { - logger.debug(s"db: ${influxDbReadonlySettings.db}, query: $query") + logQuery(influxDbReadonlySettings, query) val request = buildBaseSyncRequest("query", influxDbReadonlySettings) .param("db", influxDbReadonlySettings.db) .param("q", query) @@ -136,4 +137,8 @@ class InfluxDbClient( settings.baseUrl.stripSuffix("/") + "/" + urlencode(method) private def urlencode(value: String) = URLEncoder.encode(value, "UTF-8") + + private def logQuery(settings: InfluxDbConnectionSettings, query: String): Unit = { + logger.debug(s"${settings.user.map( _ + "@").getOrElse("")}${settings.baseUrl}/${settings.db}: $query") + } } diff --git a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Dates.scala b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Dates.scala index 0ca0db3..ecc84b8 100644 --- a/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Dates.scala +++ b/influxdb-client/src/main/scala/ru/maizy/influxdbclient/util/Dates.scala @@ -13,11 +13,12 @@ object Dates { val INFLUXDB_TIMEZONE = ZoneOffset.UTC val INFLUXDB_DATETIME_FORMAT = DateTimeFormatter.ISO_INSTANT + private val SYSTEM_TIMEZONE = ZoneId.systemDefault() def toInfluxDbFormat(date: ZonedDateTime): String = date.withZoneSameInstant(INFLUXDB_TIMEZONE).format(INFLUXDB_DATETIME_FORMAT) - def fromInfluxDbToZonedDateTime(dateTime: String, timeZone: ZoneId = ZoneId.systemDefault()): Try[ZonedDateTime] = + def fromInfluxDbToZonedDateTime(dateTime: String, timeZone: ZoneId = Dates.SYSTEM_TIMEZONE): Try[ZonedDateTime] = Try(INFLUXDB_DATETIME_FORMAT.parse(dateTime)) .map(Instant.from(_).atZone(INFLUXDB_TIMEZONE).withZoneSameInstant(timeZone)) }