Skip to content

Commit

Permalink
iss #26: poor man notifications scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
maizy committed Mar 30, 2017
1 parent 4c8462c commit 7516d1b
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,23 @@ package ru.maizy.ambient7.analysis.command
* See LICENSE.txt for details.
*/

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import ru.maizy.ambient7.analysis.notifications.NotificationsExecutor
import ru.maizy.ambient7.core.config.Ambient7Options

object NotificationCommand {

def run(opts: Ambient7Options): ReturnStatus = {
println(opts.notifications.get)
println(opts.devices.get)
ReturnStatus.success

val executor = new NotificationsExecutor(opts)
val executorFuture = executor.start()

// normally never ends
Await.ready(executorFuture, Duration.Inf)

ReturnStatus.computeError
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,92 @@ package ru.maizy.ambient7.analysis.notifications
* See LICENSE.txt for details.
*/

import scala.concurrent.Future
import java.time.ZonedDateTime
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.DurationInt
import scala.util.Success
import ru.maizy.ambient7.core.config.Ambient7Options
import ru.maizy.ambient7.analysis.influxdb
import ru.maizy.ambient7.analysis.notifications.data.Data

class NotificationsExecutor(opts: Ambient7Options) {
private val logger = notificationsLogger
var data: Option[Data] = None
val finishPromise: Option[Future[Unit]] = None

class NotificationsExecutor {
def start(opts: Ambient7Options): Future[Unit] = {
// val influxDbClient = influxdb.buildClient(opts)
private implicit val notificationContext = ExecutionContext.global

Future.failed(new Error("todo"))
def start(): Future[Unit] = {
val finishPromise = Promise[Unit]()

def onFailureProxyToPromise[T](future: Future[T]): Future[T] = {
future onFailure { case error: Throwable => finishPromise.failure(error) }
future
}

onFailureProxyToPromise(init()) onSuccess { case _ =>
onFailureProxyToPromise(sheduleDataUpdate())
}

finishPromise.future
}

private def init(): Future[Unit] = {
(opts.notifications, influxdb.buildClient(opts)) match {
case (Some(notificationsOptions), Some(influxDbClient)) =>

val refreshRate = notificationsOptions.refreshRate
// TODO: init watchers
// FIXME: tmp
val maxRequestedDuration = 20.minutes

data = Some(Data(influxDbClient, refreshRate, maxRequestedDuration))
Future.successful(())
case _ => Future.failed(new Error("Some options required for notifications not provided"))
}
}

// FIXME: add scheduler based on ScheduledExecutorService, current solution blocks one of EC thread
private def sheduleDataUpdate(step: Long = 0): Future[Unit] = {
(opts.notifications.map(_.refreshRate), data) match {

case (Some(refreshRate), Some(dataContainer)) =>
val start = ZonedDateTime.now()
val nextUpdate = start.plusSeconds(refreshRate.toSeconds)

logger.debug(s"Updating notification data, step #$step")
val dataUpdateFuture = dataUpdate(dataContainer)

def checkAndScheduleNext(): Unit = {
val end = ZonedDateTime.now()
if (end.compareTo(nextUpdate) < 0) {
val sleepTimeMillis = java.time.Duration.between(end, nextUpdate).toMillis
logger.debug(s"Call next updating after ${sleepTimeMillis / 1000} seconds")
Thread.sleep(sleepTimeMillis)
} else {
val updateTime = java.time.Duration.between(start, end).toMillis
logger.warn(s"Update cycle (${updateTime / 1000}s) longer than refresh rate (${refreshRate.toSeconds}s)")
}
sheduleDataUpdate(step + 1)
()
}

dataUpdateFuture onComplete {
case Success(_) =>
// FIXME: tmp, remove $data
logger.debug(s"Notification data has updated successful, step #$step: $data")
checkAndScheduleNext()

case util.Failure(e) =>
logger.error(s"Notification data hasn't updated, step #$step", e)
checkAndScheduleNext()
}

Future.successful(())

case _ => Future.failed(new Error("Requirements for date update not defined"))
}
}

// private def initDataBuffers(opts: Ambient7Options) = {}
private def dataUpdate(dataContainer: Data): Future[Unit] = dataContainer.update()
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package ru.maizy.ambient7.analysis.notifications.data
* See LICENSE.txt for details.
*/

import scala.concurrent.Future
import scala.concurrent.duration.Duration
import ru.maizy.influxdbclient.InfluxDbClient

Expand All @@ -14,9 +15,9 @@ class Data private (influxDbClient: InfluxDbClient, val limit: Int) {
val temp = new DataPoints[Option[Float]](limit)
val availability = new DataPoints[Boolean](limit)

def update(): Either[Seq[String], Unit] = {
def update(): Future[Unit] = {
// FIXME
Left(Seq("todo"))
Future.failed(new Error("todo"))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ru.maizy.ambient7.analysis

import com.typesafe.scalalogging.Logger

/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2017
* See LICENSE.txt for details.
*/
package object notifications {
val notificationsLogger = Logger("ru.maizy.ambient7.analysis.notifications")
}

0 comments on commit 7516d1b

Please sign in to comment.