diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala index 40f182806d2..a6c14dc1a54 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala @@ -113,7 +113,7 @@ class PeriodicProcessService(delegateDeploymentManager: DeploymentManager, delegateDeploymentManager.findJobStatus(deployedProcess.periodicProcess.processVersion.processName).flatMap { state => handleFinishedAction(deployedProcess, state) .flatMap { needsReschedule => - if (needsReschedule) reschedule(deployedProcess, state) else scheduledProcessesRepository.monad.pure(()).emptyCallback + if (needsReschedule) reschedule(deployedProcess) else scheduledProcessesRepository.monad.pure(()).emptyCallback }.runWithCallbacks } } @@ -139,7 +139,7 @@ class PeriodicProcessService(delegateDeploymentManager: DeploymentManager, } //Mark process as Finished and reschedule - we do it transactionally - private def reschedule(deployment: PeriodicProcessDeployment, state: Option[ProcessState]): RepositoryAction[Callback] = { + private def reschedule(deployment: PeriodicProcessDeployment): RepositoryAction[Callback] = { logger.info(s"Rescheduling ${deployment.display}") val process = deployment.periodicProcess for { @@ -150,8 +150,15 @@ class PeriodicProcessService(delegateDeploymentManager: DeploymentManager, handleEvent(ScheduledEvent(data, firstSchedule = false)) }.emptyCallback case Right(None) => - logger.info(s"No next run of ${deployment.display}. Deactivating") - deactivateAction(process.processVersion.processName) + scheduledProcessesRepository.findScheduled(deployment.periodicProcess.id).flatMap { scheduledDeployments => + if (scheduledDeployments.isEmpty) { + logger.info(s"No next run of ${deployment.display}. Deactivating") + deactivateAction(process.processVersion.processName) + } else { + logger.info(s"No next run of ${deployment.display} but there are still ${scheduledDeployments.size} scheduled deployments: ${scheduledDeployments.map(_.display)}") + scheduledProcessesRepository.monad.pure(()).emptyCallback + } + } case Left(error) => // This case should not happen. It would mean periodic property was valid when scheduling a process // but was made invalid when rescheduling again. diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala index 1d2b6e039bd..80a21286012 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala @@ -66,6 +66,8 @@ trait PeriodicProcessesRepository { def findDeployed: Action[Seq[PeriodicProcessDeployment]] + def findScheduled(id: PeriodicProcessId): Action[Seq[PeriodicProcessDeployment]] + def findProcessData(id: PeriodicProcessDeploymentId): Action[PeriodicProcessDeployment] def findProcessData(processName: ProcessName): Action[Seq[PeriodicProcess]] @@ -115,8 +117,7 @@ class SlickPeriodicProcessesRepository(db: JdbcBackend.DatabaseDef, private def now(): LocalDateTime = LocalDateTime.now(clock) override def findToBeDeployed: Action[Seq[PeriodicProcessDeployment]] = { - val active = (PeriodicProcesses join PeriodicProcessDeployments on (_.id === _.periodicProcessId)) - .filter { case (p, _) => p.active === true } + val active = activePeriodicProcessWithDeploymentQuery .filter { case (_, d) => d.runAt <= now && d.status === (PeriodicProcessDeploymentStatus.Scheduled: PeriodicProcessDeploymentStatus) } active .result @@ -162,8 +163,8 @@ class SlickPeriodicProcessesRepository(db: JdbcBackend.DatabaseDef, } override def getLatestDeploymentForEachSchedule(processName: ProcessName): Action[Seq[PeriodicProcessDeployment]] = { - val activeDeployments = (PeriodicProcesses join PeriodicProcessDeployments on (_.id === _.periodicProcessId)) - .filter { case (p, _) => (p.active === true) && (p.processName === processName.value) } + val activeDeployments = activePeriodicProcessWithDeploymentQuery + .filter { case (p, _) => p.processName === processName.value } val latestRunAtForEachDeployment = activeDeployments .groupBy { case (_, deployment) => deployment.scheduleName } .map { case (scheduleName, group) => @@ -202,13 +203,24 @@ class SlickPeriodicProcessesRepository(db: JdbcBackend.DatabaseDef, } override def findDeployed: Action[Seq[PeriodicProcessDeployment]] = { - val processWithDeployment = (PeriodicProcesses join PeriodicProcessDeployments on (_.id === _.periodicProcessId)) - .filter { case (p, d) => (p.active === true) && (d.status === (PeriodicProcessDeploymentStatus.Deployed: PeriodicProcessDeploymentStatus)) } + val processWithDeployment = activePeriodicProcessWithDeploymentQuery + .filter { case (p, d) => d.status === (PeriodicProcessDeploymentStatus.Deployed: PeriodicProcessDeploymentStatus) } processWithDeployment .result .map(createPeriodicProcessDeployment) } + override def findScheduled(id: PeriodicProcessId): Action[Seq[PeriodicProcessDeployment]] = { + activePeriodicProcessWithDeploymentQuery + .filter { case (p, d) => p.id === id && d.status === (PeriodicProcessDeploymentStatus.Scheduled: PeriodicProcessDeploymentStatus) } + .result + .map(createPeriodicProcessDeployment) + } + + private def activePeriodicProcessWithDeploymentQuery = { + (PeriodicProcesses join PeriodicProcessDeployments on (_.id === _.periodicProcessId)) + .filter { case (p, _) => p.active === true } + } private def createPeriodicProcessDeployment(all: Seq[(PeriodicProcessEntity, PeriodicProcessDeploymentEntity)]): Seq[PeriodicProcessDeployment] = all.map((PeriodicProcessesRepository.createPeriodicProcessDeployment _).tupled) diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala index 98a1358d044..34d4e1b6e50 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala @@ -1,5 +1,10 @@ package pl.touk.nussknacker.engine.management.periodic +import com.cronutils.builder.CronBuilder +import com.cronutils.model.CronType +import com.cronutils.model.definition.CronDefinitionBuilder +import com.cronutils.model.field.expression.FieldExpressionFactory.{on, questionMark} +import org.scalatest.LoneElement._ import org.scalatest.concurrent.ScalaFutures import org.scalatest.exceptions.TestFailedException import org.scalatest.{FunSuite, Matchers, OptionValues} @@ -10,9 +15,9 @@ import pl.touk.nussknacker.engine.management.periodic.db.HsqlProcessRepository import pl.touk.nussknacker.engine.management.periodic.model.{PeriodicProcessDeploymentState, PeriodicProcessDeploymentStatus} import pl.touk.nussknacker.engine.management.periodic.service._ import pl.touk.nussknacker.test.PatientScalaFutures -import java.time.temporal.ChronoUnit -import java.time.{Clock, Duration, Instant, LocalDateTime, ZoneOffset} +import java.time._ +import java.time.temporal.ChronoUnit import scala.collection.mutable.ArrayBuffer import scala.concurrent.Future @@ -171,9 +176,56 @@ class PeriodicProcessServiceIntegrationTest extends FunSuite toDeployAfterFinish.head.scheduleName shouldBe Some("schedule2") } + test("handle multiple one time schedules") { + var currentTime = startTime + val f = new Fixture + def service = f.periodicProcessService(currentTime) + val timeToTriggerSchedule1 = startTime.plus(1, ChronoUnit.HOURS) + val timeToTriggerSchedule2 = startTime.plus(2, ChronoUnit.HOURS) + + service.schedule(MultipleScheduleProperty(Map( + "schedule1" -> CronScheduleProperty(convertDateToCron(localTime(timeToTriggerSchedule1))), + "schedule2" -> CronScheduleProperty(convertDateToCron(localTime(timeToTriggerSchedule2))))), + ProcessVersion.empty.copy(processName = processName), "{}").futureValue + + val latestDeploymentSchedule1 = service.getLatestDeployment(processName).futureValue.value + latestDeploymentSchedule1.scheduleName.value shouldBe "schedule1" + latestDeploymentSchedule1.runAt shouldBe localTime(timeToTriggerSchedule1) + + currentTime = timeToTriggerSchedule1 + + val toDeployOnSchedule1 = service.findToBeDeployed.futureValue.loneElement + toDeployOnSchedule1.scheduleName.value shouldBe "schedule1" + + service.deploy(toDeployOnSchedule1).futureValue + + service.getLatestDeployment(processName).futureValue.value.state.status shouldBe PeriodicProcessDeploymentStatus.Deployed + + service.handleFinished.futureValue + + val toDeployAfterFinishSchedule1 = service.findToBeDeployed.futureValue + toDeployAfterFinishSchedule1 should have length 0 + val latestDeploymentSchedule2 = service.getLatestDeployment(processName).futureValue.value + latestDeploymentSchedule2.scheduleName.value shouldBe "schedule2" + latestDeploymentSchedule2.runAt shouldBe localTime(timeToTriggerSchedule2) + latestDeploymentSchedule2.state.status shouldBe PeriodicProcessDeploymentStatus.Scheduled + + currentTime = timeToTriggerSchedule2 + + val toDeployOnSchedule2 = service.findToBeDeployed.futureValue.loneElement + toDeployOnSchedule2.scheduleName.value shouldBe "schedule2" + + service.deploy(toDeployOnSchedule2).futureValue + + service.getLatestDeployment(processName).futureValue.value.state.status shouldBe PeriodicProcessDeploymentStatus.Deployed + + service.handleFinished.futureValue + + service.getLatestDeployment(processName).futureValue shouldBe None + } + test("Should handle failed event handler") { val timeToTriggerCheck = startTime.plus(2, ChronoUnit.HOURS) - val expectedScheduleTime = startTime.plus(1, ChronoUnit.HOURS).truncatedTo(ChronoUnit.HOURS) var currentTime = startTime @@ -202,4 +254,17 @@ class PeriodicProcessServiceIntegrationTest extends FunSuite } } + + private def convertDateToCron(date: LocalDateTime): String = { + CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) + .withYear(on(date.getYear)) + .withMonth(on(date.getMonth.getValue)) + .withDoM(on(date.getDayOfMonth)) + .withDoW(questionMark()) + .withHour(on(date.getHour)) + .withMinute(on(date.getMinute)) + .withSecond(on(date.getSecond)) + .instance() + .asString() + } } diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala index f609d0eca2d..1e6981cb4f3 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala @@ -103,6 +103,23 @@ class PeriodicProcessServiceTest extends FunSuite f.events.toList shouldBe List(FinishedEvent(finished, f.delegateDeploymentManagerStub.jobStatus), ScheduledEvent(scheduled, firstSchedule = false)) } + test("handleFinished - should deactivate process if there are no future schedules") { + val f = new Fixture + val yearNow = LocalDate.now().getYear + val cronInPast = CronScheduleProperty(s"0 0 6 6 9 ? ${yearNow - 1}") + f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed, scheduleProperty = cronInPast) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Finished) + + f.periodicProcessService.handleFinished.futureValue + + val processEntity = f.repository.processEntities.loneElement + processEntity.active shouldBe false + f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Finished + // TODO should be false + val event = createPeriodicProcessDeployment(processEntity.copy(active = true), f.repository.deploymentEntities.loneElement) + f.events.loneElement shouldBe FinishedEvent(event, f.delegateDeploymentManagerStub.jobStatus) + } + test("handle first schedule") { val f = new Fixture @@ -178,4 +195,8 @@ class PeriodicProcessServiceTest extends FunSuite intercept[TestFailedException](tryToSchedule(cronInPast)).getCause shouldBe a[PeriodicProcessException] intercept[TestFailedException](tryToSchedule(MultipleScheduleProperty(Map("s1" -> cronInPast, "s2" -> cronInPast)))).getCause shouldBe a[PeriodicProcessException] } + + test("getLatestDeployment - should return correct deployment for multiple schedules") { + + } } diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala index ee662796b74..7866ebfd4a6 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala @@ -2,15 +2,13 @@ package pl.touk.nussknacker.engine.management.periodic.db import cats.{Id, Monad} import io.circe.syntax.EncoderOps -import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.build.EspProcessBuilder import pl.touk.nussknacker.engine.canonize.ProcessCanonizer -import pl.touk.nussknacker.engine.management.periodic +import pl.touk.nussknacker.engine.management.periodic._ import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository.createPeriodicProcessDeployment import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus import pl.touk.nussknacker.engine.management.periodic.model._ -import pl.touk.nussknacker.engine.management.periodic.{model, _} import pl.touk.nussknacker.engine.marshall.ProcessMarshaller import java.time.chrono.ChronoLocalDateTime @@ -33,7 +31,9 @@ class InMemPeriodicProcessesRepository extends PeriodicProcessesRepository { override def run[T](action: Id[T]): Future[T] = Future.successful(action) - def addActiveProcess(processName: ProcessName, deploymentStatus: PeriodicProcessDeploymentStatus): Unit = { + def addActiveProcess(processName: ProcessName, + deploymentStatus: PeriodicProcessDeploymentStatus, + scheduleProperty: SingleScheduleProperty = CronScheduleProperty("0 0 * * * ?")): Unit = { import pl.touk.nussknacker.engine.spel.Implicits.asSpelExpression val id = PeriodicProcessId(Random.nextLong()) @@ -50,7 +50,7 @@ class InMemPeriodicProcessesRepository extends PeriodicProcessesRepository { )).noSpaces, inputConfigDuringExecutionJson = "{}", jarFileName = "", - scheduleProperty = CronScheduleProperty("0 0 * * * ?").asInstanceOf[periodic.ScheduleProperty].asJson.noSpaces, + scheduleProperty = scheduleProperty.asJson.noSpaces, active = true, createdAt = LocalDateTime.now() ) @@ -107,6 +107,13 @@ class InMemPeriodicProcessesRepository extends PeriodicProcessesRepository { d <- deploymentEntities if d.periodicProcessId == p.id && d.status == PeriodicProcessDeploymentStatus.Deployed } yield createPeriodicProcessDeployment(p, d) + override def findScheduled(id: PeriodicProcessId): Seq[PeriodicProcessDeployment] = { + for { + p <- processEntities if p.active && p.id == id + d <- deploymentEntities if d.periodicProcessId == p.id && d.status == PeriodicProcessDeploymentStatus.Scheduled + } yield createPeriodicProcessDeployment(p, d) + } + override def findProcessData(id: PeriodicProcessDeploymentId): PeriodicProcessDeployment = (for { d <- deploymentEntities if d.id == id @@ -145,16 +152,6 @@ class InMemPeriodicProcessesRepository extends PeriodicProcessesRepository { createPeriodicProcessDeployment(processEntities.find(_.id == id).head ,deploymentEntity) } - private def createDeploymentWithJarData(processEntity: PeriodicProcessEntity): DeploymentWithJarData = { - val processVersion = ProcessVersion.empty.copy(versionId = processEntity.processVersionId, processName = ProcessName(processEntity.processName)) - model.DeploymentWithJarData( - processVersion = processVersion, - processJson = processEntity.processJson, - inputConfigDuringExecutionJson = processEntity.inputConfigDuringExecutionJson, - jarFileName = processEntity.jarFileName - ) - } - private def update(id: PeriodicProcessDeploymentId)(action: PeriodicProcessDeploymentEntity => PeriodicProcessDeploymentEntity): Unit = { deploymentEntities .zipWithIndex