Skip to content

Commit

Permalink
iss #26: influxdb client: async methods
Browse files Browse the repository at this point in the history
  • Loading branch information
maizy committed Apr 3, 2017
1 parent 7516d1b commit 64adbcc
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 27 deletions.
1 change: 1 addition & 0 deletions ambient7-analysis/src/etc/logback.dev.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<logger name="ru.maizy.ambient7" level="DEBUG"/>
<logger name="ru.maizy.influxdbclient" level="DEBUG"/>
<logger name="scalikejdbc" level="INFO"/>
<logger name="com.ning.http.client" level="INFO"/>

<root level="DEBUG">
<appender-ref ref="CONSOLE"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package ru.maizy.ambient7.analysis.notifications.data
* See LICENSE.txt for details.
*/

import scala.concurrent.Future
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.Duration
import ru.maizy.influxdbclient.InfluxDbClient

Expand All @@ -16,8 +16,11 @@ class Data private (influxDbClient: InfluxDbClient, val limit: Int) {
val availability = new DataPoints[Boolean](limit)

def update(): Future[Unit] = {
// FIXME
Future.failed(new Error("todo"))
implicit val ec = ExecutionContext.Implicits.global
influxDbClient.query("select ppm from co2 limit 10").map { res =>
println(res)
()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object InfluxDbCo2Service extends LazyLogging {
"group by time(1m)"

influxDbClient
.query(query)
.syncQuery(query)
.left.map(e => s"error when requesting data from influxdb: $e")
.right.flatMap { results =>
val totalMinutes = time.Duration.between(from, until).toMinutes.toInt
Expand Down Expand Up @@ -111,7 +111,7 @@ object InfluxDbCo2Service extends LazyLogging {

val lowerBound = upperBound.minusDays(1).truncatedTo(ChronoUnit.DAYS)
val query = buildQuery(lowerBound, upperBound)
influxDbClient.query(query) match {
influxDbClient.syncQuery(query) match {
case Left(error) =>
Left(s"Unable to perform query for $lowerBound - $upperBound: ${error.message}")

Expand Down
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ val scalacOpts = Seq(
"-unchecked",
"-feature",
"-explaintypes",
"-Xfatal-warnings",
// "-Xfatal-warnings",
"-Xlint:_",
"-Ywarn-dead-code",
"-Ywarn-inaccessible",
Expand Down Expand Up @@ -42,7 +42,8 @@ lazy val commonDependencies = Seq(

lazy val httpClientDependencies = Seq(
libraryDependencies ++= Seq(
"org.scalaj" %% "scalaj-http" % "2.3.0"
"org.scalaj" %% "scalaj-http" % "2.3.0", // TODO: iss #39: remove sync methods and this dependancy
"net.databinder.dispatch" %% "dispatch-core" % "0.11.2"
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ru.maizy.influxdbclient

/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/

class Error(message: String, cause: Option[Throwable] = None) extends Exception(message, cause.orNull)
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ package ru.maizy.influxdbclient
*/

import java.net.URLEncoder
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.{ DurationInt, FiniteDuration }
import scala.collection.JavaConverters.{ collectionAsScalaIterableConverter, mapAsScalaMapConverter }
import scala.util.{ Failure, Success, Try }
import scalaj.http.{ BaseHttp, HttpOptions, HttpResponse }
import com.typesafe.scalalogging.LazyLogging
import dispatch.{ Http, Req, as, implyRequestHandlerTuple, url }
import ru.maizy.influxdbclient.data.{ ErrorDto, QueryResult }
import ru.maizy.influxdbclient.responses.QueryResultsProtocol._
import spray.json.{ DeserializationException, JsonParser, ParserInput }

// TODO: remove sync methods (iss #39)
class InfluxDbClient(
val influxDbSettings: InfluxDbConnectionSettings,
val userAgent: Option[String] = None,
Expand All @@ -37,9 +41,43 @@ class InfluxDbClient(
)
)

def rawDataQuery(query: String): Either[ErrorDto, HttpResponse[Array[Byte]]] = {

def rawDataQuery(query: String, readOnly: Boolean = true)(implicit ec: ExecutionContext): Future[RawHttpResponse] = {
val settings = if (readOnly) influxDbReadonlySettings else influxDbSettings
logger.debug(s"${settings.user}@${settings.db}: $query")
val request = buildBaseRequest("query", settings)
.addQueryParameter("db", settings.db)
.addQueryParameter("q", query)

Http(request > as.Response { response =>
val headers = response.getHeaders.asScala.mapValues(_.asScala.toIndexedSeq).toMap
RawHttpResponse(response.getStatusCode, response.getResponseBodyAsBytes, headers)
})
}

def query(query: String, readOnly: Boolean = true)(implicit ec: ExecutionContext): Future[QueryResult] = {
rawDataQuery(query, readOnly) map { raw =>
parseQueryResult(raw) match {
case Right(r) => r
case Left(error) => throw new Error(error.message, error.cause)
}

}
}

// TODO: remove ErrorDto use Error instead
private def parseQueryResult(response: RawHttpResponse): Either[ErrorDto, QueryResult] = {
if (response.code != 200) {
Left(parserErrorResponse(response))
} else {
parserQueryResponse(response)
}
}

@deprecated("use async methods", "0.4")
def syncRawDataQuery(query: String): Either[ErrorDto, HttpResponse[Array[Byte]]] = {
logger.debug(s"db: ${influxDbReadonlySettings.db}, query: $query")
val request = buildBaseRequest("query", influxDbReadonlySettings)
val request = buildBaseSyncRequest("query", influxDbReadonlySettings)
.param("db", influxDbReadonlySettings.db)
.param("q", query)

Expand All @@ -49,17 +87,15 @@ class InfluxDbClient(
}
}

def query(query: String): Either[ErrorDto, QueryResult] = {
rawDataQuery(query).right.flatMap { response =>
if (response.code != 200) {
Left(parserErrorResponse(response))
} else {
parserQueryResponse(response)
}
@deprecated("use async methods", "0.4")
def syncQuery(query: String): Either[ErrorDto, QueryResult] = {
syncRawDataQuery(query).right.flatMap { response =>
parseQueryResult(RawHttpResponse(response.code, response.body, response.headers))
}
}

private def parserErrorResponse(response: HttpResponse[Array[Byte]]): ErrorDto =
// TODO: remove ErrorDto use Error instead
private def parserErrorResponse(response: RawHttpResponse): ErrorDto =
Try(JsonParser(ParserInput(response.body)).convertTo[ErrorDto]) match {
case Success(error) => error
case Failure(e: DeserializationException) =>
Expand All @@ -68,7 +104,8 @@ class InfluxDbClient(
ErrorDto(s"unknown response deserialization error (response code ${response.code})", Some(e))
}

private def parserQueryResponse(response: HttpResponse[Array[Byte]]): Either[ErrorDto, QueryResult] =
// TODO: remove ErrorDto use Error instead
private def parserQueryResponse(response: RawHttpResponse): Either[ErrorDto, QueryResult] =
Try(JsonParser(ParserInput(response.body)).convertTo[QueryResult]) match {
case Success(result) =>
Right(result)
Expand All @@ -78,14 +115,23 @@ class InfluxDbClient(
Left(ErrorDto("unknown response deserialization error", Some(e)))
}

private def buildBaseRequest(method: String, settings: InfluxDbConnectionSettings) = {
private def buildBaseSyncRequest(method: String, settings: InfluxDbConnectionSettings) = {
val request = httpClient.apply(buildUri(method, settings))
(settings.user, settings.password) match {
case (Some(user), Some(password)) => request.auth(user, password)
case _ => request
}
}

private def buildBaseRequest(method: String, settings: InfluxDbConnectionSettings): Req = {
val request = url(buildUri(method, settings))
(settings.user, settings.password) match {
case (Some(user), Some(password)) => request.as(user, password)
case _ => request
}
}

// TODO: use dispatch.url constructor instead of a string
private def buildUri(method: String, settings: InfluxDbConnectionSettings): String =
settings.baseUrl.stripSuffix("/") + "/" + urlencode(method)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ru.maizy.influxdbclient

/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/

case class RawHttpResponse(code: Int, body: Array[Byte], headers: Map[String, IndexedSeq[String]])
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package ru.maizy.influxdbclient.data
* Copyright (c) Nikita Kovaliov, maizy.ru, 2016-2017
* See LICENSE.txt for details.
*/
@deprecated("use Throwable", "0.4")
case class ErrorDto(message: String, cause: Option[Throwable] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,32 @@ package ru.maizy.influxdbclient.util
*/
object Escape {

def encodeKey(key: String): String = key.replace("\"", "\\\"") // " -> \"
def encodeIdentifier(key: String): String = key.replace("\"", "\\\"") // " -> \"

def escapeKey(key: String): String = "\"" + encodeKey(key) + "\""
def escapeIdentifier(key: String): String = "\"" + encodeIdentifier(key) + "\""

def decodeIdentifier(value: String): String = value.replace("\\\"", "\"")

def encodeTag(key: String): String = key.replace(",", "\\,")

def decodeTag(key: String): String = key.replace("\\,", ",")


// TODO: build right replacement based on spec
def encodeValue(value: String): String = value.replace("'", "\\'")

def decodeValue(value: String): String = value.replace("\\'", "'")

def escapeValue(value: String): String = "'" + encodeValue(value) + "'"


def encodeTag(key: String): String = key.replace(",", "\\,")

def decodeTag(key: String): String = key.replace("\\,", ",")

def tagsToQueryCondition(tags: IndexedSeq[(String, String)]): String = {
if (tags.isEmpty) {
"true"
} else {
tags
.map { pair =>
val (name, value) = pair
encodeKey(name) + " = " + encodeValue(value)
encodeIdentifier(name) + " = " + encodeValue(value)
}
.mkString(" and ")
}
Expand Down

0 comments on commit 64adbcc

Please sign in to comment.