Skip to content

Commit

Permalink
iss #41: async influxdb client, refactoring (backport #26)
Browse files Browse the repository at this point in the history
  • Loading branch information
maizy committed Oct 4, 2017
1 parent 5c8cbf7 commit e440795
Show file tree
Hide file tree
Showing 37 changed files with 465 additions and 145 deletions.
1 change: 1 addition & 0 deletions ambient7-analysis/src/etc/logback.dev.xml
Expand Up @@ -11,6 +11,7 @@
<logger name="ru.maizy.ambient7" level="DEBUG"/>
<logger name="ru.maizy.influxdbclient" level="DEBUG"/>
<logger name="scalikejdbc" level="INFO"/>
<logger name="com.ning.http.client" level="INFO"/>

<root level="DEBUG">
<appender-ref ref="CONSOLE"/>
Expand Down
@@ -1,13 +1,15 @@
package ru.maizy.ambient7.analysis

import ru.maizy.ambient7.core.config.{ Ambient7Options, AnalysisSpecificOptions }
import ru.maizy.ambient7.core.config.reader.{ DevicesConfigReader, InfluxDbConfigReader, MainDbConfigReader }
import ru.maizy.ambient7.core.config.reader.UniversalConfigReader

/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/

import ru.maizy.ambient7.core.config.Ambient7Options
import ru.maizy.ambient7.core.config.options.AnalysisSpecificOptions
import ru.maizy.ambient7.core.config.reader.{ DevicesConfigReader, InfluxDbConfigReader, MainDbConfigReader }
import ru.maizy.ambient7.core.config.reader.UniversalConfigReader

object AnalysisConfigReader
extends UniversalConfigReader
with DevicesConfigReader
Expand Down
Expand Up @@ -7,9 +7,9 @@ package ru.maizy.ambient7.analysis.command

import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import scala.concurrent.duration.DurationInt
import com.typesafe.scalalogging.LazyLogging
import scalikejdbc._
import ru.maizy.ambient7.analysis.influxdb
import ru.maizy.ambient7.analysis.service.InfluxDbCo2Service
import ru.maizy.ambient7.core.config.Ambient7Options
import ru.maizy.ambient7.core.data.{ Co2Agent, Co2Device }
Expand All @@ -20,7 +20,7 @@ import ru.maizy.influxdbclient.InfluxDbClient
object AggregateCo2Command extends LazyLogging {

def run(opts: Ambient7Options): ReturnStatus = {
(initInfluxDbClient(opts), initDbSession(opts)) match {
(influxdb.buildClient(opts), initDbSession(opts)) match {
case (Some(influxDbClient), Some(dBSession)) =>
val results: List[Boolean] = opts.devices
.map(_.co2Devices).getOrElse(List.empty)
Expand Down Expand Up @@ -101,25 +101,6 @@ object AggregateCo2Command extends LazyLogging {
eitherStartDate
}

private def initInfluxDbClient(opts: Ambient7Options): Option[InfluxDbClient] = {
(
opts.influxDb.flatMap(_.clientConnectionSettings),
opts.influxDb.flatMap(_.readonlyClientConnectionSetting)
) match {
case (Some(setting), Some(readonlySettings)) =>
Some(
new InfluxDbClient(
influxDbSettings = setting,
_influxDbReadonlySettings = Some(readonlySettings),
userAgent = Some("ambient7-analysis"),
connectTimeout = 500.millis,
readTimeout = 10.seconds
)
)
case _ => None
}
}

private def initDbSession(opts: Ambient7Options): Option[DBSession] = {
opts.mainDb.flatMap { dbSetting =>
dbSetting.url.map { url =>
Expand Down
@@ -0,0 +1,32 @@
package ru.maizy.ambient7.analysis

/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/

import scala.concurrent.duration.DurationInt
import ru.maizy.ambient7.core.config.Ambient7Options
import ru.maizy.influxdbclient.InfluxDbClient


package object influxdb {
def buildClient(opts: Ambient7Options): Option[InfluxDbClient] = {
(
opts.influxDb.flatMap(_.clientConnectionSettings),
opts.influxDb.flatMap(_.readonlyClientConnectionSetting)
) match {
case (Some(setting), Some(readonlySettings)) =>
Some(
new InfluxDbClient(
influxDbSettings = setting,
_influxDbReadonlySettings = Some(readonlySettings),
userAgent = Some("ambient7-analysis"),
connectTimeout = 500.millis,
readTimeout = 10.seconds
)
)
case _ => None
}
}
}
@@ -0,0 +1,26 @@
package ru.maizy.ambient7.analysis.service

/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/

sealed trait AggregateType {
def code: String
def apply(value: String): String
}

object MAX extends AggregateType {
override def code: String = "max"
override def apply(value: String): String = s"max($value)"
}

object MIN extends AggregateType {
override def code: String = "min"
override def apply(value: String): String = s"min($value)"
}

object MEAN extends AggregateType {
override def code: String = "mean"
override def apply(value: String): String = s"mean($value)"
}
Expand Up @@ -11,14 +11,22 @@ import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import scala.annotation.tailrec
import scala.collection.immutable.ListMap
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.util.{ Failure, Success }
import com.typesafe.scalalogging.LazyLogging
import ru.maizy.ambient7.core.data.{ Co2AggregatedLevels, Co2Agent }
import ru.maizy.ambient7.core.data.{ Co2Agent, Co2AggregatedLevels }
import ru.maizy.influxdbclient.data.{ NullValue, SeriesItem, StringValue }
import ru.maizy.influxdbclient.util.Dates
import ru.maizy.influxdbclient.util.Escape.{ escapeValue, tagsToQueryCondition }
import ru.maizy.influxdbclient.InfluxDbClient

case class Co2LevelResult(ppm: Option[Int], from: ZonedDateTime, duration: Duration, aggregateType: AggregateType) {
def until: ZonedDateTime = from.plus(duration.toMillis, ChronoUnit.SECONDS)
}


object InfluxDbCo2Service extends LazyLogging {

val CO2_OK = 800
Expand All @@ -45,7 +53,7 @@ object InfluxDbCo2Service extends LazyLogging {
"group by time(1m)"

influxDbClient
.query(query)
.syncQuery(query)
.left.map(e => s"error when requesting data from influxdb: $e")
.right.flatMap { results =>
val totalMinutes = time.Duration.between(from, until).toMinutes.toInt
Expand Down Expand Up @@ -93,6 +101,7 @@ object InfluxDbCo2Service extends LazyLogging {
val agentNameEscaped = escapeValue(agentId.agentName)
val untilTruncated = until.truncatedTo(ChronoUnit.DAYS)

// TODO: use InfluxDbUtils.getValuesForTimePeriod
def buildQuery(lowerBound: ZonedDateTime, upperBound: ZonedDateTime): String = {
"select time, max(ppm) as max_ppm " +
"from co2 " +
Expand All @@ -111,7 +120,7 @@ object InfluxDbCo2Service extends LazyLogging {

val lowerBound = upperBound.minusDays(1).truncatedTo(ChronoUnit.DAYS)
val query = buildQuery(lowerBound, upperBound)
influxDbClient.query(query) match {
influxDbClient.syncQuery(query) match {
case Left(error) =>
Left(s"Unable to perform query for $lowerBound - $upperBound: ${error.message}")

Expand Down Expand Up @@ -189,4 +198,30 @@ object InfluxDbCo2Service extends LazyLogging {

iter(startDate, ListMap.empty)
}

def getValuesForTimePeriod(
client: InfluxDbClient,
agent: Co2Agent,
from: ZonedDateTime,
until: ZonedDateTime,
segment: Duration = 10.seconds,
aggregate: AggregateType = MAX)(implicit ex: ExecutionContext): Future[List[Co2LevelResult]] =
{
InfluxDbUtils.getValuesForTimePeriod[Int](
client,
from,
until,
"co2",
"ppm",
InfluxDbUtils.intExtractor,
aggregate,
segment,
condition = s"agent = ${escapeValue(agent.agentName)} and ${tagsToQueryCondition(agent.tags.asPairs)}"
).map { results =>
results.map { case (time, mayBeValue) =>
Co2LevelResult(mayBeValue, time, segment, aggregate)
}
}

}
}
@@ -0,0 +1,93 @@
package ru.maizy.ambient7.analysis.service

/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/

import java.time.ZonedDateTime
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success, Try }
import ru.maizy.ambient7.core.util.DateTimeIterator
import ru.maizy.influxdbclient.InfluxDbClient
import ru.maizy.influxdbclient.data.{ NumberValue, StringValue, Value }
import ru.maizy.influxdbclient.util.Escape.{ escapeIdentifier, escapeValue }
import ru.maizy.influxdbclient.util.Dates

private[service] object InfluxDbUtils {

def getValuesForTimePeriod[T](
client: InfluxDbClient,
from: ZonedDateTime,
until: ZonedDateTime,
table: String,
valueColumn: String,
valueExtractor: Value => Option[T],
aggregate: AggregateType,
segment: Duration,
timeColumn: String = "time",
condition: String = "true"
)(implicit ex: ExecutionContext): Future[List[(ZonedDateTime, Option[T])]] =
{
require(from.compareTo(until) < 0)
require(segment.isFinite() && segment.toSeconds >= 1)

val dateFrom = Dates.toInfluxDbFormat(from)
val dateUntil = Dates.toInfluxDbFormat(until)

val query =
s"select ${aggregate(valueColumn)} as value " +
s"from ${escapeIdentifier(table)} " +
s"where ${escapeIdentifier(timeColumn)} >= ${escapeValue(dateFrom)} " +
s"and ${escapeIdentifier(timeColumn)} < ${escapeValue(dateUntil)} " +
s"and ($condition) " +
s"group by time(${segment.toSeconds}s)"

// TODO: little bit shitty code
client.query(query) map { res =>
val timeToValue: Map[ZonedDateTime, Option[T]] = res.firstSeriesItems.map { seriesItem =>
(seriesItem.findColumnIndex(timeColumn), seriesItem.findColumnIndex("value")) match {
case (Some(timeIndex), Some(valueIndex)) =>
seriesItem.values.zipWithIndex.map { case (row, rowNum) =>
val rawTime = row(timeIndex)
rawTime match {
case timeValue: StringValue =>
Dates.fromInfluxDbToZonedDateTime(timeValue.value) match {
case Success(time) =>
row(valueIndex) match {
case value: Value => (time, valueExtractor(value))
case _ => (time, None)
}

case Failure(e) => throw new Error(s"unparsed time $rawTime", e)
}
case _ => throw new Error(s"time not found in row #$rowNum")
}
}.toMap
case _ => throw new Error(s"time & value columns not found in results")
}
}.getOrElse(Map.empty)

DateTimeIterator(from, until, segment).toList.map { time =>
(time, timeToValue.getOrElse(time, None))
}
}
}

val intExtractor: (Value => Option[Int]) = {
case value: NumberValue => Try(value.value.toInt) match {
case Success(i) => Some(i)
case Failure(_) => None
}
case _ => None
}

val floatExtractor: (Value => Option[Float]) = {
case value: NumberValue => Try(value.value.toFloat) match {
case Success(i) => Some(i)
case Failure(_) => None
}
case _ => None
}
}
Expand Up @@ -5,7 +5,8 @@ package ru.maizy.ambient7.webapp
* See LICENSE.txt for details.
*/

import ru.maizy.ambient7.core.config.{ Ambient7Options, WebAppSpecificOptions }
import ru.maizy.ambient7.core.config.Ambient7Options
import ru.maizy.ambient7.core.config.options.WebAppSpecificOptions
import ru.maizy.ambient7.core.config.reader.{ DevicesConfigReader, MainDbConfigReader, UniversalConfigReader }

object WebAppConfigReader
Expand Down
5 changes: 3 additions & 2 deletions build.sbt
Expand Up @@ -9,7 +9,7 @@ val scalacOpts = Seq(
"-unchecked",
"-feature",
"-explaintypes",
"-Xfatal-warnings",
// "-Xfatal-warnings",
"-Xlint:_",
"-Ywarn-dead-code",
"-Ywarn-inaccessible",
Expand Down Expand Up @@ -42,7 +42,8 @@ lazy val commonDependencies = Seq(

lazy val httpClientDependencies = Seq(
libraryDependencies ++= Seq(
"org.scalaj" %% "scalaj-http" % "2.3.0"
"org.scalaj" %% "scalaj-http" % "2.3.0", // TODO: iss #39: remove sync methods and this dependancy
"net.databinder.dispatch" %% "dispatch-core" % "0.11.2"
)
)

Expand Down
38 changes: 19 additions & 19 deletions config/ambient7.conf
@@ -1,42 +1,42 @@
// sample production config
db {
url = "jdbc:h2:file:/var/ambient7/analysis;AUTO_SERVER=TRUE"
user = "ambient7"
password = ""
url: "jdbc:h2:file:/var/ambient7/analysis;AUTO_SERVER=TRUE"
user: "ambient7"
password: ""
}

influxdb {
database = "ambient7"
database: "ambient7"

baseurl = "http://localhost:8086/"
user = "ambient7_rw"
password = ""
baseurl: "http://localhost:8086/"
user: "ambient7_rw"
password: ""

// optional readonly access
// readonly {
// baseurl = "http://localhost:8086/"
// user = "ambient7_ro"
// password = ""
// baseurl: "http://localhost:8086/"
// user: "ambient7_ro"
// password: ""
// }

}

devices = [
{
id = "main"
model = "mt8057"
agent-name = "main"
agent-tags = ""
id: "main"
model: "mt8057"
agent-name: "main"
agent-tags: ""
}

{
id = "additional-mt8057"
model = "mt8057"
agent-name = "additional"
agent-tags = "place=livingroom,altitude=200"
id: "additional-mt8057"
model: "mt8057"
agent-name: "additional"
agent-tags: "place=livingroom,altitude=200"
}
]

webapp {
port = 22480
port: 22480
}

0 comments on commit e440795

Please sign in to comment.