Skip to content

Commit

Permalink
fix multiple one time schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
jedrz committed Sep 14, 2021
1 parent 0c760b1 commit c792e23
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 28 deletions.
Expand Up @@ -113,7 +113,7 @@ class PeriodicProcessService(delegateDeploymentManager: DeploymentManager,
delegateDeploymentManager.findJobStatus(deployedProcess.periodicProcess.processVersion.processName).flatMap { state =>
handleFinishedAction(deployedProcess, state)
.flatMap { needsReschedule =>
if (needsReschedule) reschedule(deployedProcess, state) else scheduledProcessesRepository.monad.pure(()).emptyCallback
if (needsReschedule) reschedule(deployedProcess) else scheduledProcessesRepository.monad.pure(()).emptyCallback
}.runWithCallbacks
}
}
Expand All @@ -139,7 +139,7 @@ class PeriodicProcessService(delegateDeploymentManager: DeploymentManager,
}

//Mark process as Finished and reschedule - we do it transactionally
private def reschedule(deployment: PeriodicProcessDeployment, state: Option[ProcessState]): RepositoryAction[Callback] = {
private def reschedule(deployment: PeriodicProcessDeployment): RepositoryAction[Callback] = {
logger.info(s"Rescheduling ${deployment.display}")
val process = deployment.periodicProcess
for {
Expand All @@ -150,8 +150,15 @@ class PeriodicProcessService(delegateDeploymentManager: DeploymentManager,
handleEvent(ScheduledEvent(data, firstSchedule = false))
}.emptyCallback
case Right(None) =>
logger.info(s"No next run of ${deployment.display}. Deactivating")
deactivateAction(process.processVersion.processName)
scheduledProcessesRepository.findScheduled(deployment.periodicProcess.id).flatMap { scheduledDeployments =>
if (scheduledDeployments.isEmpty) {
logger.info(s"No next run of ${deployment.display}. Deactivating")
deactivateAction(process.processVersion.processName)
} else {
logger.info(s"No next run of ${deployment.display} but there are still ${scheduledDeployments.size} scheduled deployments: ${scheduledDeployments.map(_.display)}")
scheduledProcessesRepository.monad.pure(()).emptyCallback
}
}
case Left(error) =>
// This case should not happen. It would mean periodic property was valid when scheduling a process
// but was made invalid when rescheduling again.
Expand Down
Expand Up @@ -66,6 +66,8 @@ trait PeriodicProcessesRepository {

def findDeployed: Action[Seq[PeriodicProcessDeployment]]

def findScheduled(id: PeriodicProcessId): Action[Seq[PeriodicProcessDeployment]]

def findProcessData(id: PeriodicProcessDeploymentId): Action[PeriodicProcessDeployment]

def findProcessData(processName: ProcessName): Action[Seq[PeriodicProcess]]
Expand Down Expand Up @@ -115,8 +117,7 @@ class SlickPeriodicProcessesRepository(db: JdbcBackend.DatabaseDef,
private def now(): LocalDateTime = LocalDateTime.now(clock)

override def findToBeDeployed: Action[Seq[PeriodicProcessDeployment]] = {
val active = (PeriodicProcesses join PeriodicProcessDeployments on (_.id === _.periodicProcessId))
.filter { case (p, _) => p.active === true }
val active = activePeriodicProcessWithDeploymentQuery
.filter { case (_, d) => d.runAt <= now && d.status === (PeriodicProcessDeploymentStatus.Scheduled: PeriodicProcessDeploymentStatus) }
active
.result
Expand Down Expand Up @@ -162,8 +163,8 @@ class SlickPeriodicProcessesRepository(db: JdbcBackend.DatabaseDef,
}

override def getLatestDeploymentForEachSchedule(processName: ProcessName): Action[Seq[PeriodicProcessDeployment]] = {
val activeDeployments = (PeriodicProcesses join PeriodicProcessDeployments on (_.id === _.periodicProcessId))
.filter { case (p, _) => (p.active === true) && (p.processName === processName.value) }
val activeDeployments = activePeriodicProcessWithDeploymentQuery
.filter { case (p, _) => p.processName === processName.value }
val latestRunAtForEachDeployment = activeDeployments
.groupBy { case (_, deployment) => deployment.scheduleName }
.map { case (scheduleName, group) =>
Expand Down Expand Up @@ -202,13 +203,24 @@ class SlickPeriodicProcessesRepository(db: JdbcBackend.DatabaseDef,
}

override def findDeployed: Action[Seq[PeriodicProcessDeployment]] = {
val processWithDeployment = (PeriodicProcesses join PeriodicProcessDeployments on (_.id === _.periodicProcessId))
.filter { case (p, d) => (p.active === true) && (d.status === (PeriodicProcessDeploymentStatus.Deployed: PeriodicProcessDeploymentStatus)) }
val processWithDeployment = activePeriodicProcessWithDeploymentQuery
.filter { case (p, d) => d.status === (PeriodicProcessDeploymentStatus.Deployed: PeriodicProcessDeploymentStatus) }
processWithDeployment
.result
.map(createPeriodicProcessDeployment)
}

override def findScheduled(id: PeriodicProcessId): Action[Seq[PeriodicProcessDeployment]] = {
activePeriodicProcessWithDeploymentQuery
.filter { case (p, d) => p.id === id && d.status === (PeriodicProcessDeploymentStatus.Scheduled: PeriodicProcessDeploymentStatus) }
.result
.map(createPeriodicProcessDeployment)
}

private def activePeriodicProcessWithDeploymentQuery = {
(PeriodicProcesses join PeriodicProcessDeployments on (_.id === _.periodicProcessId))
.filter { case (p, _) => p.active === true }
}

private def createPeriodicProcessDeployment(all: Seq[(PeriodicProcessEntity, PeriodicProcessDeploymentEntity)]): Seq[PeriodicProcessDeployment] =
all.map((PeriodicProcessesRepository.createPeriodicProcessDeployment _).tupled)
Expand Down
@@ -1,5 +1,10 @@
package pl.touk.nussknacker.engine.management.periodic

import com.cronutils.builder.CronBuilder
import com.cronutils.model.CronType
import com.cronutils.model.definition.CronDefinitionBuilder
import com.cronutils.model.field.expression.FieldExpressionFactory.{on, questionMark}
import org.scalatest.LoneElement._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.exceptions.TestFailedException
import org.scalatest.{FunSuite, Matchers, OptionValues}
Expand All @@ -10,9 +15,9 @@ import pl.touk.nussknacker.engine.management.periodic.db.HsqlProcessRepository
import pl.touk.nussknacker.engine.management.periodic.model.{PeriodicProcessDeploymentState, PeriodicProcessDeploymentStatus}
import pl.touk.nussknacker.engine.management.periodic.service._
import pl.touk.nussknacker.test.PatientScalaFutures
import java.time.temporal.ChronoUnit
import java.time.{Clock, Duration, Instant, LocalDateTime, ZoneOffset}

import java.time._
import java.time.temporal.ChronoUnit
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future

Expand Down Expand Up @@ -171,9 +176,56 @@ class PeriodicProcessServiceIntegrationTest extends FunSuite
toDeployAfterFinish.head.scheduleName shouldBe Some("schedule2")
}

test("handle multiple one time schedules") {
var currentTime = startTime
val f = new Fixture
def service = f.periodicProcessService(currentTime)
val timeToTriggerSchedule1 = startTime.plus(1, ChronoUnit.HOURS)
val timeToTriggerSchedule2 = startTime.plus(2, ChronoUnit.HOURS)

service.schedule(MultipleScheduleProperty(Map(
"schedule1" -> CronScheduleProperty(convertDateToCron(localTime(timeToTriggerSchedule1))),
"schedule2" -> CronScheduleProperty(convertDateToCron(localTime(timeToTriggerSchedule2))))),
ProcessVersion.empty.copy(processName = processName), "{}").futureValue

val latestDeploymentSchedule1 = service.getLatestDeployment(processName).futureValue.value
latestDeploymentSchedule1.scheduleName.value shouldBe "schedule1"
latestDeploymentSchedule1.runAt shouldBe localTime(timeToTriggerSchedule1)

currentTime = timeToTriggerSchedule1

val toDeployOnSchedule1 = service.findToBeDeployed.futureValue.loneElement
toDeployOnSchedule1.scheduleName.value shouldBe "schedule1"

service.deploy(toDeployOnSchedule1).futureValue

service.getLatestDeployment(processName).futureValue.value.state.status shouldBe PeriodicProcessDeploymentStatus.Deployed

service.handleFinished.futureValue

val toDeployAfterFinishSchedule1 = service.findToBeDeployed.futureValue
toDeployAfterFinishSchedule1 should have length 0
val latestDeploymentSchedule2 = service.getLatestDeployment(processName).futureValue.value
latestDeploymentSchedule2.scheduleName.value shouldBe "schedule2"
latestDeploymentSchedule2.runAt shouldBe localTime(timeToTriggerSchedule2)
latestDeploymentSchedule2.state.status shouldBe PeriodicProcessDeploymentStatus.Scheduled

currentTime = timeToTriggerSchedule2

val toDeployOnSchedule2 = service.findToBeDeployed.futureValue.loneElement
toDeployOnSchedule2.scheduleName.value shouldBe "schedule2"

service.deploy(toDeployOnSchedule2).futureValue

service.getLatestDeployment(processName).futureValue.value.state.status shouldBe PeriodicProcessDeploymentStatus.Deployed

service.handleFinished.futureValue

service.getLatestDeployment(processName).futureValue shouldBe None
}

test("Should handle failed event handler") {
val timeToTriggerCheck = startTime.plus(2, ChronoUnit.HOURS)
val expectedScheduleTime = startTime.plus(1, ChronoUnit.HOURS).truncatedTo(ChronoUnit.HOURS)

var currentTime = startTime

Expand Down Expand Up @@ -202,4 +254,17 @@ class PeriodicProcessServiceIntegrationTest extends FunSuite
}

}

private def convertDateToCron(date: LocalDateTime): String = {
CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ))
.withYear(on(date.getYear))
.withMonth(on(date.getMonth.getValue))
.withDoM(on(date.getDayOfMonth))
.withDoW(questionMark())
.withHour(on(date.getHour))
.withMinute(on(date.getMinute))
.withSecond(on(date.getSecond))
.instance()
.asString()
}
}
Expand Up @@ -103,6 +103,23 @@ class PeriodicProcessServiceTest extends FunSuite
f.events.toList shouldBe List(FinishedEvent(finished, f.delegateDeploymentManagerStub.jobStatus), ScheduledEvent(scheduled, firstSchedule = false))
}

test("handleFinished - should deactivate process if there are no future schedules") {
val f = new Fixture
val yearNow = LocalDate.now().getYear
val cronInPast = CronScheduleProperty(s"0 0 6 6 9 ? ${yearNow - 1}")
f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed, scheduleProperty = cronInPast)
f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Finished)

f.periodicProcessService.handleFinished.futureValue

val processEntity = f.repository.processEntities.loneElement
processEntity.active shouldBe false
f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Finished
// TODO should be false
val event = createPeriodicProcessDeployment(processEntity.copy(active = true), f.repository.deploymentEntities.loneElement)
f.events.loneElement shouldBe FinishedEvent(event, f.delegateDeploymentManagerStub.jobStatus)
}

test("handle first schedule") {
val f = new Fixture

Expand Down Expand Up @@ -178,4 +195,8 @@ class PeriodicProcessServiceTest extends FunSuite
intercept[TestFailedException](tryToSchedule(cronInPast)).getCause shouldBe a[PeriodicProcessException]
intercept[TestFailedException](tryToSchedule(MultipleScheduleProperty(Map("s1" -> cronInPast, "s2" -> cronInPast)))).getCause shouldBe a[PeriodicProcessException]
}

test("getLatestDeployment - should return correct deployment for multiple schedules") {

}
}
Expand Up @@ -2,15 +2,13 @@ package pl.touk.nussknacker.engine.management.periodic.db

import cats.{Id, Monad}
import io.circe.syntax.EncoderOps
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.build.EspProcessBuilder
import pl.touk.nussknacker.engine.canonize.ProcessCanonizer
import pl.touk.nussknacker.engine.management.periodic
import pl.touk.nussknacker.engine.management.periodic._
import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository.createPeriodicProcessDeployment
import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus
import pl.touk.nussknacker.engine.management.periodic.model._
import pl.touk.nussknacker.engine.management.periodic.{model, _}
import pl.touk.nussknacker.engine.marshall.ProcessMarshaller

import java.time.chrono.ChronoLocalDateTime
Expand All @@ -33,7 +31,9 @@ class InMemPeriodicProcessesRepository extends PeriodicProcessesRepository {

override def run[T](action: Id[T]): Future[T] = Future.successful(action)

def addActiveProcess(processName: ProcessName, deploymentStatus: PeriodicProcessDeploymentStatus): Unit = {
def addActiveProcess(processName: ProcessName,
deploymentStatus: PeriodicProcessDeploymentStatus,
scheduleProperty: SingleScheduleProperty = CronScheduleProperty("0 0 * * * ?")): Unit = {
import pl.touk.nussknacker.engine.spel.Implicits.asSpelExpression

val id = PeriodicProcessId(Random.nextLong())
Expand All @@ -50,7 +50,7 @@ class InMemPeriodicProcessesRepository extends PeriodicProcessesRepository {
)).noSpaces,
inputConfigDuringExecutionJson = "{}",
jarFileName = "",
scheduleProperty = CronScheduleProperty("0 0 * * * ?").asInstanceOf[periodic.ScheduleProperty].asJson.noSpaces,
scheduleProperty = scheduleProperty.asJson.noSpaces,
active = true,
createdAt = LocalDateTime.now()
)
Expand Down Expand Up @@ -107,6 +107,13 @@ class InMemPeriodicProcessesRepository extends PeriodicProcessesRepository {
d <- deploymentEntities if d.periodicProcessId == p.id && d.status == PeriodicProcessDeploymentStatus.Deployed
} yield createPeriodicProcessDeployment(p, d)

override def findScheduled(id: PeriodicProcessId): Seq[PeriodicProcessDeployment] = {
for {
p <- processEntities if p.active && p.id == id
d <- deploymentEntities if d.periodicProcessId == p.id && d.status == PeriodicProcessDeploymentStatus.Scheduled
} yield createPeriodicProcessDeployment(p, d)
}

override def findProcessData(id: PeriodicProcessDeploymentId): PeriodicProcessDeployment =
(for {
d <- deploymentEntities if d.id == id
Expand Down Expand Up @@ -145,16 +152,6 @@ class InMemPeriodicProcessesRepository extends PeriodicProcessesRepository {
createPeriodicProcessDeployment(processEntities.find(_.id == id).head ,deploymentEntity)
}

private def createDeploymentWithJarData(processEntity: PeriodicProcessEntity): DeploymentWithJarData = {
val processVersion = ProcessVersion.empty.copy(versionId = processEntity.processVersionId, processName = ProcessName(processEntity.processName))
model.DeploymentWithJarData(
processVersion = processVersion,
processJson = processEntity.processJson,
inputConfigDuringExecutionJson = processEntity.inputConfigDuringExecutionJson,
jarFileName = processEntity.jarFileName
)
}

private def update(id: PeriodicProcessDeploymentId)(action: PeriodicProcessDeploymentEntity => PeriodicProcessDeploymentEntity): Unit = {
deploymentEntities
.zipWithIndex
Expand Down

0 comments on commit c792e23

Please sign in to comment.