Skip to content

Commit

Permalink
iss #26: [wip] device data updater
Browse files Browse the repository at this point in the history
  • Loading branch information
maizy committed Apr 5, 2017
1 parent 64adbcc commit f854106
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,32 @@ import scala.concurrent.duration.DurationInt
import scala.util.Success
import ru.maizy.ambient7.core.config.Ambient7Options
import ru.maizy.ambient7.analysis.influxdb
import ru.maizy.ambient7.analysis.notifications.data.Data
import ru.maizy.ambient7.analysis.notifications.data.{ Co2Data, Data }
import ru.maizy.ambient7.core.data.Device

class NotificationsExecutor(opts: Ambient7Options) {
private case class WatchedDevice(device: Device, data: Data)

private val logger = notificationsLogger
var data: Option[Data] = None
val finishPromise: Option[Future[Unit]] = None
private var watchedDevices: List[WatchedDevice] = List.empty
private var finishPromise: Option[Promise[Unit]] = None

private implicit val notificationContext = ExecutionContext.global

def start(): Future[Unit] = {
val finishPromise = Promise[Unit]()
val promise = Promise[Unit]()
finishPromise = Some(promise)

def onFailureProxyToPromise[T](future: Future[T]): Future[T] = {
future onFailure { case error: Throwable => finishPromise.failure(error) }
future onFailure { case error: Throwable => promise.failure(error) }
future
}

onFailureProxyToPromise(init()) onSuccess { case _ =>
onFailureProxyToPromise(sheduleDataUpdate())
}

finishPromise.future
promise.future
}

private def init(): Future[Unit] = {
Expand All @@ -43,23 +47,27 @@ class NotificationsExecutor(opts: Ambient7Options) {
// TODO: init watchers
// FIXME: tmp
val maxRequestedDuration = 20.minutes

data = Some(Data(influxDbClient, refreshRate, maxRequestedDuration))
val device = opts.co2Devices.head
watchedDevices =
WatchedDevice(
device,
new Co2Data(device.agent, influxDbClient, refreshRate, maxRequestedDuration)
) :: watchedDevices
Future.successful(())
case _ => Future.failed(new Error("Some options required for notifications not provided"))
}
}

// FIXME: add scheduler based on ScheduledExecutorService, current solution blocks one of EC thread
private def sheduleDataUpdate(step: Long = 0): Future[Unit] = {
(opts.notifications.map(_.refreshRate), data) match {
opts.notifications.map(_.refreshRate) match {

case (Some(refreshRate), Some(dataContainer)) =>
case Some(refreshRate) =>
val start = ZonedDateTime.now()
val nextUpdate = start.plusSeconds(refreshRate.toSeconds)

logger.debug(s"Updating notification data, step #$step")
val dataUpdateFuture = dataUpdate(dataContainer)
val dataUpdateFuture = updateData()

def checkAndScheduleNext(): Unit = {
val end = ZonedDateTime.now()
Expand All @@ -78,7 +86,7 @@ class NotificationsExecutor(opts: Ambient7Options) {
dataUpdateFuture onComplete {
case Success(_) =>
// FIXME: tmp, remove $data
logger.debug(s"Notification data has updated successful, step #$step: $data")
logger.debug(s"Notification data has updated successful, step #$step")
checkAndScheduleNext()

case util.Failure(e) =>
Expand All @@ -88,9 +96,17 @@ class NotificationsExecutor(opts: Ambient7Options) {

Future.successful(())

case _ => Future.failed(new Error("Requirements for date update not defined"))
case _ => Future.failed(new Error("Requirements for date update isn't defined"))
}
}

private def dataUpdate(dataContainer: Data): Future[Unit] = dataContainer.update()
private def updateData(): Future[Unit] = {
Future
.sequence(watchedDevices.map(_.data.update()))
.map { _ =>
// FIXME tmp
println(watchedDevices.map(_.data))
()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package ru.maizy.ambient7.analysis.notifications.data

/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/

import java.time.temporal.ChronoUnit
import java.time.{ ZoneId, ZonedDateTime }
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import ru.maizy.ambient7.analysis.service.InfluxDbCo2Service
import ru.maizy.ambient7.core.data.Co2Agent
import ru.maizy.influxdbclient.InfluxDbClient

class Co2Data(
val agent: Co2Agent,
influxDbClient: InfluxDbClient,
val refreshRate: Duration,
val storeDuration: Duration) extends Data
{

private val limit = Co2Data.computeLimit(storeDuration, refreshRate)
val co2 = new DataPoints[Option[Int]](limit)
val temp = new DataPoints[Option[Float]](limit)
val availability = new DataPoints[Boolean](limit)

def update(): Future[Unit] = {
implicit val ec = ExecutionContext.Implicits.global

// FIXME tmp
val now = ZonedDateTime.of(2017, 2, 12, 12, 0, 0, 0, ZoneId.systemDefault())
val from = now.minus(storeDuration.toMillis, ChronoUnit.MILLIS)

InfluxDbCo2Service.getValuesForTimePeriod(
influxDbClient,
agent,
from,
now,
segment = 5.seconds min refreshRate
) map { res =>
res.foreach { level =>
co2.appendPoint(level.from, level.ppm)
}
()
}

// TODO: temp
// TODO: availability

}
}

object Co2Data {

def computeLimit(refreshRate: Duration, storeDuration: Duration): Int =
(storeDuration.toSeconds.toDouble / refreshRate.toSeconds).floor.toInt + 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,9 @@ package ru.maizy.ambient7.analysis.notifications.data
* See LICENSE.txt for details.
*/

import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.Duration
import ru.maizy.influxdbclient.InfluxDbClient
import scala.concurrent.Future

class Data private (influxDbClient: InfluxDbClient, val limit: Int) {

val co2 = new DataPoints[Option[Int]](limit)
val temp = new DataPoints[Option[Float]](limit)
val availability = new DataPoints[Boolean](limit)

def update(): Future[Unit] = {
implicit val ec = ExecutionContext.Implicits.global
influxDbClient.query("select ppm from co2 limit 10").map { res =>
println(res)
()
}
}
}

object Data {
def apply(influxDbClient: InfluxDbClient, refreshRate: Duration, storeDuration: Duration): Data =
new Data(influxDbClient, computeLimit(refreshRate, storeDuration))

def computeLimit(refreshRate: Duration, storeDuration: Duration): Int =
(storeDuration.toSeconds.toDouble / refreshRate.toSeconds).floor.toInt + 1
trait Data {
def update(): Future[Unit]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ru.maizy.ambient7.analysis.service

/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/

sealed trait AggregateType {
def code: String
def apply(value: String): String
}

object MAX extends AggregateType {
override def code: String = "max"
override def apply(value: String): String = s"max($value)"
}

object MIN extends AggregateType {
override def code: String = "min"
override def apply(value: String): String = s"min($value)"
}

object MEAN extends AggregateType {
override def code: String = "mean"
override def apply(value: String): String = s"mean($value)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,24 @@ import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import scala.annotation.tailrec
import scala.collection.immutable.ListMap
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.util.{ Failure, Success }
import com.typesafe.scalalogging.LazyLogging
import ru.maizy.ambient7.core.data.{ Co2AggregatedLevels, Co2Agent }
import ru.maizy.influxdbclient.data.{ NullValue, SeriesItem, StringValue }
import ru.maizy.ambient7.analysis.notifications.Error
import ru.maizy.ambient7.core.data.{ Co2Agent, Co2AggregatedLevels }
import ru.maizy.ambient7.core.util.DateTimeIterator
import ru.maizy.influxdbclient.data.{ NullValue, NumberValue, SeriesItem, StringValue }
import ru.maizy.influxdbclient.util.Dates
import ru.maizy.influxdbclient.util.Escape.{ escapeValue, tagsToQueryCondition }
import ru.maizy.influxdbclient.InfluxDbClient

case class Co2Level(ppm: Option[Int], from: ZonedDateTime, duration: Duration, aggregateType: AggregateType) {
def until: ZonedDateTime = from.plus(duration.toMillis, ChronoUnit.SECONDS)
}


object InfluxDbCo2Service extends LazyLogging {

val CO2_OK = 800
Expand Down Expand Up @@ -189,4 +199,58 @@ object InfluxDbCo2Service extends LazyLogging {

iter(startDate, ListMap.empty)
}

def getValuesForTimePeriod(
client: InfluxDbClient,
agent: Co2Agent,
from: ZonedDateTime,
until: ZonedDateTime,
segment: Duration = 10.seconds,
aggregate: AggregateType = MAX)(implicit ex: ExecutionContext): Future[List[Co2Level]] =
{
require(from.compareTo(until) < 0)
require(segment.isFinite() && segment.toSeconds >= 1)

val dateFrom = Dates.toInfluxDbFormat(from)
val dateUntil = Dates.toInfluxDbFormat(until)

val query =
s"select ${aggregate("ppm")} as ppm " +
"from co2 " +
s"where time >= ${escapeValue(dateFrom)} and time < ${escapeValue(dateUntil)} " +
s"and agent = ${escapeValue(agent.agentName)} and ${tagsToQueryCondition(agent.tags.asPairs)} " +
s"group by time(${segment.toSeconds}s)"

// little bit shitty code
client.query(query) map { res =>
val timeToPpm: Map[ZonedDateTime, Option[Int]] = res.firstSeriesItems.map { seriesItem =>
(seriesItem.findColumnIndex("time"), seriesItem.findColumnIndex("ppm")) match {
case (Some(timeIndex), Some(ppmIndex)) =>
seriesItem.values.zipWithIndex.map { case (row, rowNum) =>
val rawTime = row(timeIndex)
rawTime match {
case timeValue: StringValue =>
Dates.fromInfluxDbToZonedDateTime(timeValue.value) match {
case Success(time) =>
row(ppmIndex) match {
case ppm: NumberValue => (time, Some(ppm.value.toInt))
case _ => (time, None)
}

case Failure(e) => throw new Error(s"unparsed time $rawTime", e)
}
case _=> throw new Error(s"time not found in row #$rowNum")
}
}.toMap
case _ => throw new Error(s"time & ppm columns not found in results")
}
}.getOrElse(Map.empty)

DateTimeIterator(from, until, segment).toList.map { time =>
Co2Level(timeToPpm.get(time).flatten, time, segment, aggregate)
}

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ package ru.maizy.ambient7.analysis.test.notifications.data
*/

import scala.concurrent.duration.DurationInt
import ru.maizy.ambient7.analysis.notifications.data.Data
import ru.maizy.ambient7.analysis.notifications.data.Co2Data
import ru.maizy.ambient7.analysis.test.BaseSpec

class DataSpec extends BaseSpec {

"Data.computeLimit" should "works" in {
Data.computeLimit(refreshRate = 5.seconds, storeDuration = 1.minute ) shouldBe 13
Data.computeLimit(refreshRate = 5.seconds, storeDuration = 10.seconds ) shouldBe 3
Data.computeLimit(refreshRate = 5.seconds, storeDuration = 5.seconds ) shouldBe 2
Data.computeLimit(refreshRate = 5.seconds, storeDuration = 1.seconds ) shouldBe 1
Co2Data.computeLimit(refreshRate = 5.seconds, storeDuration = 1.minute ) shouldBe 13
Co2Data.computeLimit(refreshRate = 5.seconds, storeDuration = 10.seconds ) shouldBe 3
Co2Data.computeLimit(refreshRate = 5.seconds, storeDuration = 5.seconds ) shouldBe 2
Co2Data.computeLimit(refreshRate = 5.seconds, storeDuration = 1.seconds ) shouldBe 1
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ trait DevicesConfigReader extends UniversalConfigReader {
watcherSpec <- device.watchersSpecs;
actionId <- watcherSpec.actionsIds
if !availableActionsIds.contains(actionId)
) yield s"watcher '${watcherSpec.watcherType}' of device '${device.id}' contains unknown action $actionId"
) yield s"watcher '${watcherSpec.watcherType}' of device '${device.id}' contains unknown action '$actionId'"
}
}
.getOrElse(List.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package ru.maizy.ambient7.core.util

import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import scala.concurrent.duration.Duration

// TODO: generalize for any date order past->future, future->past
class DateTimeIterator private (from: ZonedDateTime, to: ZonedDateTime, step: Long, stepUnit: ChronoUnit)
Expand All @@ -26,4 +27,9 @@ object DateTimeIterator {
def apply(from: ZonedDateTime, to: ZonedDateTime, step: Long, stepUnit: ChronoUnit): DateTimeIterator = {
new DateTimeIterator(from, to, step, stepUnit)
}

def apply(from: ZonedDateTime, to: ZonedDateTime, stepDuration: Duration): DateTimeIterator = {
require(stepDuration.isFinite)
new DateTimeIterator(from, to, stepDuration.toMicros, ChronoUnit.MICROS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ class InfluxDbClient(
)



def rawDataQuery(query: String, readOnly: Boolean = true)(implicit ec: ExecutionContext): Future[RawHttpResponse] = {
val settings = if (readOnly) influxDbReadonlySettings else influxDbSettings
logger.debug(s"${settings.user}@${settings.db}: $query")
logQuery(settings, query)
val request = buildBaseRequest("query", settings)
.addQueryParameter("db", settings.db)
.addQueryParameter("q", query)
Expand Down Expand Up @@ -76,7 +77,7 @@ class InfluxDbClient(

@deprecated("use async methods", "0.4")
def syncRawDataQuery(query: String): Either[ErrorDto, HttpResponse[Array[Byte]]] = {
logger.debug(s"db: ${influxDbReadonlySettings.db}, query: $query")
logQuery(influxDbReadonlySettings, query)
val request = buildBaseSyncRequest("query", influxDbReadonlySettings)
.param("db", influxDbReadonlySettings.db)
.param("q", query)
Expand Down Expand Up @@ -136,4 +137,8 @@ class InfluxDbClient(
settings.baseUrl.stripSuffix("/") + "/" + urlencode(method)

private def urlencode(value: String) = URLEncoder.encode(value, "UTF-8")

private def logQuery(settings: InfluxDbConnectionSettings, query: String): Unit = {
logger.debug(s"${settings.user.map( _ + "@").getOrElse("")}${settings.baseUrl}/${settings.db}: $query")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ object Dates {

val INFLUXDB_TIMEZONE = ZoneOffset.UTC
val INFLUXDB_DATETIME_FORMAT = DateTimeFormatter.ISO_INSTANT
private val SYSTEM_TIMEZONE = ZoneId.systemDefault()

def toInfluxDbFormat(date: ZonedDateTime): String =
date.withZoneSameInstant(INFLUXDB_TIMEZONE).format(INFLUXDB_DATETIME_FORMAT)

def fromInfluxDbToZonedDateTime(dateTime: String, timeZone: ZoneId = ZoneId.systemDefault()): Try[ZonedDateTime] =
def fromInfluxDbToZonedDateTime(dateTime: String, timeZone: ZoneId = Dates.SYSTEM_TIMEZONE): Try[ZonedDateTime] =
Try(INFLUXDB_DATETIME_FORMAT.parse(dateTime))
.map(Instant.from(_).atZone(INFLUXDB_TIMEZONE).withZoneSameInstant(timeZone))
}

0 comments on commit f854106

Please sign in to comment.