-
Notifications
You must be signed in to change notification settings - Fork 2
/
InfluxDbCo2Service.scala
227 lines (193 loc) · 7.49 KB
/
InfluxDbCo2Service.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package ru.maizy.ambient7.analysis.service
/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2016-2017
* See LICENSE.txt for details.
*/
import java.time
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import scala.annotation.tailrec
import scala.collection.immutable.ListMap
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.util.{ Failure, Success }
import com.typesafe.scalalogging.LazyLogging
import ru.maizy.ambient7.core.data.{ Co2Agent, Co2AggregatedLevels }
import ru.maizy.influxdbclient.data.{ NullValue, SeriesItem, StringValue }
import ru.maizy.influxdbclient.util.Dates
import ru.maizy.influxdbclient.util.Escape.{ escapeValue, tagsToQueryCondition }
import ru.maizy.influxdbclient.InfluxDbClient
case class Co2LevelResult(ppm: Option[Int], from: ZonedDateTime, duration: Duration, aggregateType: AggregateType) {
def until: ZonedDateTime = from.plus(duration.toMillis, ChronoUnit.SECONDS)
}
object InfluxDbCo2Service extends LazyLogging {
val CO2_OK = 800
val CO2_NOT_OK = 1200
val DEFAULT_MAX_EMPTY_DURATION = 30
def computeLevels(
influxDbClient: InfluxDbClient,
from: ZonedDateTime,
until: ZonedDateTime,
agentId: Co2Agent): Either[String, Co2AggregatedLevels] = {
require(from.compareTo(until) < 0)
val dateFrom = Dates.toInfluxDbFormat(from)
val dateTo = Dates.toInfluxDbFormat(until)
val query =
"select max(ppm) as max_ppm " +
"from co2 " +
s"where time >= ${escapeValue(dateFrom)} and time < ${escapeValue(dateTo)} " +
s"and agent = ${escapeValue(agentId.agentName)} and ${tagsToQueryCondition(agentId.tags.asPairs)} " +
"group by time(1m)"
influxDbClient
.syncQuery(query)
.left.map(e => s"error when requesting data from influxdb: $e")
.right.flatMap { results =>
val totalMinutes = time.Duration.between(from, until).toMinutes.toInt
results.headOption.flatMap(_.headOption) match {
case Some(series) =>
series.getColumnNumberValues("max_ppm")
.right.map { perMinuteLevels =>
Co2AggregatedLevels(
lowLevel = perMinuteLevels.count(_.toInt < CO2_OK),
mediumLevel = perMinuteLevels.count(l => l.toInt >= CO2_OK && l.toInt < CO2_NOT_OK),
highLevel = perMinuteLevels.count(_.toInt >= CO2_NOT_OK),
unknownLevel = totalMinutes - perMinuteLevels.size,
from = from,
to = until,
agentId = agentId
)
}
case _ => Right(
Co2AggregatedLevels(
lowLevel = 0,
mediumLevel = 0,
highLevel = 0,
unknownLevel = totalMinutes,
from = from,
to = until,
agentId = agentId
)
)
}
}
}
/**
* note that an until date truncated to hours
*/
def detectStartDateTime(
influxDbClient: InfluxDbClient,
until: ZonedDateTime,
agentId: Co2Agent,
maxEmptyDuration: Int = DEFAULT_MAX_EMPTY_DURATION): Either[String, ZonedDateTime] = {
val tagsConditions = tagsToQueryCondition(agentId.tags.asPairs)
val agentNameEscaped = escapeValue(agentId.agentName)
val untilTruncated = until.truncatedTo(ChronoUnit.DAYS)
// TODO: use InfluxDbUtils.getValuesForTimePeriod
def buildQuery(lowerBound: ZonedDateTime, upperBound: ZonedDateTime): String = {
"select time, max(ppm) as max_ppm " +
"from co2 " +
s"where time >= ${escapeValue(Dates.toInfluxDbFormat(lowerBound))} "+
s"and time < ${escapeValue(Dates.toInfluxDbFormat(upperBound))} " +
s"and agent = $agentNameEscaped and $tagsConditions " +
"group by time(1h)"
}
@tailrec
def find(
upperBound: ZonedDateTime,
res: ZonedDateTime = until,
noDataCount: Int = 0,
lastNonEmptyResults: Option[SeriesItem] = None): Either[String, ZonedDateTime] = {
val lowerBound = upperBound.minusDays(1).truncatedTo(ChronoUnit.DAYS)
val query = buildQuery(lowerBound, upperBound)
influxDbClient.syncQuery(query) match {
case Left(error) =>
Left(s"Unable to perform query for $lowerBound - $upperBound: ${error.message}")
case Right(result) =>
result.firstSeriesItemIfNumberColumnNotEmpty("max_ppm") match {
case Some(seriesItem) =>
find(lowerBound, lowerBound, 0, Some(seriesItem))
case None =>
if (noDataCount >= maxEmptyDuration) {
Right(findStartHour(res, lastNonEmptyResults))
} else {
find(lowerBound, res, noDataCount + 1, lastNonEmptyResults)
}
}
}
}
def findStartHour(day: ZonedDateTime, mayBeDayResults: Option[SeriesItem]): ZonedDateTime = {
mayBeDayResults match {
case None => day
case Some(dayResults) =>
// results start from some hour, iterate from begining of day & find first non empty hour
val firstHour = dayResults.values
.find { row =>
row.size == 2 && row(1) != NullValue
}
.map(row => row(0))
firstHour match {
case Some(rawDateTime: StringValue) =>
Dates.fromInfluxDbToZonedDateTime(rawDateTime.value, day.getZone) match {
case Success(dateTime) =>
// sync timezones & replace hour
dateTime.truncatedTo(ChronoUnit.HOURS)
case Failure(_) => day
}
case _ => day
}
}
}
find(untilTruncated)
}
def analyseLevelsByHour(
startDate: ZonedDateTime,
until: ZonedDateTime,
influxDbClient: InfluxDbClient,
agentId: Co2Agent): Map[ZonedDateTime, Co2AggregatedLevels] = {
@tailrec
def iter(
from: ZonedDateTime,
res: Map[ZonedDateTime, Co2AggregatedLevels]): Map[ZonedDateTime, Co2AggregatedLevels] = {
val to = from.plusHours(1).truncatedTo(ChronoUnit.HOURS)
computeLevels(influxDbClient, from, to, agentId) match {
case _ if to.compareTo(until) >= 0 =>
res
case Left(error) =>
logger.warn(s"Unable to analyse ${from.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}, skipping: $error")
iter(to, res)
case Right(aggregate) if !aggregate.hasAnyResult =>
logger.warn(s"There is no results for ${from.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}, skipping")
iter(to, res)
case Right(aggregate) =>
iter(to, res + (from -> aggregate))
}
}
iter(startDate, ListMap.empty)
}
def getValuesForTimePeriod(
client: InfluxDbClient,
agent: Co2Agent,
from: ZonedDateTime,
until: ZonedDateTime,
segment: Duration = 10.seconds,
aggregate: AggregateType = MAX)(implicit ex: ExecutionContext): Future[List[Co2LevelResult]] =
{
InfluxDbUtils.getValuesForTimePeriod[Int](
client,
from,
until,
"co2",
"ppm",
InfluxDbUtils.intExtractor,
aggregate,
segment,
condition = s"agent = ${escapeValue(agent.agentName)} and ${tagsToQueryCondition(agent.tags.asPairs)}"
).map { results =>
results.map { case (time, mayBeValue) =>
Co2LevelResult(mayBeValue, time, segment, aggregate)
}
}
}
}