Skip to content

Commit

Permalink
Add Kron extension (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
justwrote committed Nov 3, 2020
1 parent c241517 commit 99036f7
Show file tree
Hide file tree
Showing 51 changed files with 974 additions and 304 deletions.
59 changes: 50 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
[![GitHub Build Status](https://img.shields.io/github/workflow/status/justwrote/kjob/CI/master?style=flat)](https://github.com/justwrote/kjob/actions?query=workflow%3ACI)
[![Coverage Status](https://coveralls.io/repos/github/justwrote/kjob/badge.svg)](https://coveralls.io/github/justwrote/kjob)

A coroutine based persistent background scheduler written in Kotlin.
A coroutine based persistent background (cron) scheduler written in Kotlin.

## Features

* Persist scheduled jobs (e.g. mongoDB)
* [Cron](#cron) jobs
* Nice DSL for registering and scheduling jobs
* Multiple instances possible
* Failed jobs will be rescheduled
Expand Down Expand Up @@ -67,6 +68,12 @@ kjob.register(OrderCreatedEmail) {
kjob.schedule(OrderCreatedEmail) {
props[it.recipient] = "customer@example.com"
}

// or provide some delay for the scheduling
kjob.schedule(OrderCreatedEmail, 5.seconds) {
props[it.recipient] = "customer@example.com"
}
// this runs the job not immediately but - you may guess it already - in 5 seconds!
```

For more details please take a look at the [examples](https://github.com/justwrote/kjob/blob/master/kjob-example/src/main/kotlin)
Expand Down Expand Up @@ -146,27 +153,61 @@ object ShowIdModule : ExtensionModule<ShowIdEx, ShowIdEx.Configuration, BaseKJob
}
}

fun main() {
val kjob = kjob(InMem) {
extension(ShowIdModule) // register our extension and bind it to the kjob life cycle
}.start()
// ...

kjob(ShowIdExtension).showId() // access our new extension method

kjob.shutdown()
val kjob = kjob(InMem) {
extension(ShowIdModule) // register our extension and bind it to the kjob life cycle
}

// ...

kjob(ShowIdExtension).showId() // access our new extension method
```

To see a more advanced version take a look at this [example](https://github.com/justwrote/kjob/blob/master/kjob-example/src/main/kotlin/Example_Extension.kt)

## Cron

With kjob you are also able to schedule jobs with the familiar cron expression. To get Kron - the name of the extension to enable Cron scheduling in kjob - you need to add the following dependency:

```groovy
dependencies {
implementation "it.justwrote:kjob-kron:<version>"
}
```

After that you can schedule cron jobs as easy as every other job with kjob.

```kotlin
// define a Kron job with a name and a cron expression (e.g. 5 seconds)
object PrintStuff : KronJob("print-stuff", "*/5 * * ? * * *")

// ...

val kjob = kjob(InMem) {
extension(KronModule) // enable the Kron extension
}

// ...

// define the executed code
kjob(Kron).kron(PrintStuff) {
maxRetries = 3 // and you can access the already familiar settings you are used to
execute {
println("${Instant.now()}: executing kron task '${it.name}' with jobId '$jobId'")
}
}
```

You can find more in this [example](https://github.com/justwrote/kjob/blob/master/kjob-example/src/main/kotlin/Example_Kron.kt)


## Roadmap

Here is an unordered list of features that I would like to see in kjob. If you
consider one of them important please open an issue.

- Priority support
- Cron features
- Backoff algorithm for failed jobs
- REST API
- Dashboard
Expand Down
31 changes: 28 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
id "com.github.ben-manes.versions" version "$versions_version"
id "nebula.release" version "$nebula_version"
id "jacoco"
id 'com.github.kt3k.coveralls' version "2.10.1"
id 'com.github.kt3k.coveralls' version "2.10.2"
}

allprojects {
Expand Down Expand Up @@ -57,7 +57,6 @@ subprojects {
html.enabled = true
csv.enabled = false

// xml.destination file("${buildDir}/reports/jacoco/test/jacocoTestReport.xml")
html.destination file("${buildDir}/reports/jacoco/test/html")
}

Expand All @@ -67,7 +66,7 @@ subprojects {
violationRules {
rule {
limit {
minimum = 0.9
minimum = 0.8
}
}
}
Expand Down Expand Up @@ -115,14 +114,20 @@ task codeCoverageReport(type: JacocoReport) {
project(':kjob-example') {
dependencies {
implementation project(":kjob-core")
implementation project(":kjob-kron")
implementation project(":kjob-mongo")
implementation project(":kjob-inmem")

implementation "com.cronutils:cron-utils:$cronutils_version", {
exclude group: "org.slf4j", module: "slf4j-simple"
}

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_version"
implementation "ch.qos.logback:logback-classic:$logback_version"
}
}


project(':kjob-mongo') {
apply plugin: "published"
dependencies {
Expand Down Expand Up @@ -156,6 +161,26 @@ project(':kjob-inmem') {
}
}

project(':kjob-kron') {
apply plugin: "published"
dependencies {
implementation project(":kjob-core")
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_version"
implementation "com.cronutils:cron-utils:$cronutils_version", {
exclude group: "org.slf4j", module: "slf4j-simple"
}
api "org.slf4j:slf4j-api:$slf4j_version"

testImplementation project(":kjob-inmem")
testImplementation "io.kotest:kotest-runner-junit5:$kotest_version"
testImplementation "io.kotest:kotest-assertions-core:$kotest_version"
testImplementation "io.mockk:mockk:$mockk_version"
testImplementation project(path: ":kjob-core", configuration: "testArtifacts")

testRuntimeOnly "ch.qos.logback:logback-classic:$logback_version"
}
}

project(':kjob-core') {
apply plugin: "published"
dependencies {
Expand Down
19 changes: 10 additions & 9 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
coroutines_version=1.3.6
embed_mongo_version=2.2.0
coroutines_version=1.4.0
embed_mongo_version=3.0.0
jacoco_version=0.8.5
kotest_version=4.0.0-BETA3
kotlin_version=1.3.72
kotest_version=4.3.1
kotlin_version=1.4.10
logback_version=1.2.3
mockk_version=1.10.0
mongo_version=1.13.0
nebula_version=15.0.0
mockk_version=1.10.2
mongo_version=4.1.1
nebula_version=15.0.1
org.gradle.console=rich
rx_version=2.4.0
slf4j_version=1.7.30
testlogger_version=2.0.0
versions_version=0.28.0
testlogger_version=2.1.1
versions_version=0.34.0
cronutils_version=9.1.1
5 changes: 2 additions & 3 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#Wed May 13 09:43:55 CEST 2020
distributionUrl=https\://services.gradle.org/distributions/gradle-6.4-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
43 changes: 35 additions & 8 deletions kjob-core/src/main/kotlin/it/justwrote/kjob/BaseKJob.kt
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package it.justwrote.kjob

import it.justwrote.kjob.dsl.KJobFunctions
import it.justwrote.kjob.dsl.RegisterContext
import it.justwrote.kjob.dsl.ScheduleContext
import it.justwrote.kjob.dsl.*
import it.justwrote.kjob.extension.Extension
import it.justwrote.kjob.extension.ExtensionId
import it.justwrote.kjob.extension.ExtensionModule
import it.justwrote.kjob.internal.*
import it.justwrote.kjob.internal.scheduler.JobCleanupScheduler
import it.justwrote.kjob.internal.scheduler.JobService
import it.justwrote.kjob.internal.scheduler.KeepAliveScheduler
import it.justwrote.kjob.job.JobSettings
import it.justwrote.kjob.repository.JobRepository
import it.justwrote.kjob.repository.LockRepository
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import java.time.Clock
import java.time.Instant
import java.util.*
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlin.time.toJavaDuration

abstract class BaseKJob<Config : BaseKJob.Configuration>(val config: Config) : KJob {
private val logger = LoggerFactory.getLogger(javaClass)
Expand All @@ -27,6 +31,7 @@ abstract class BaseKJob<Config : BaseKJob.Configuration>(val config: Config) : K

internal open val millis: Long = 1000 // allow override for testing

open val clock: Clock = Clock.systemUTC() // meant only for testing
val id: UUID = UUID.randomUUID()

open class Configuration : KJob.Configuration() {
Expand Down Expand Up @@ -82,8 +87,8 @@ abstract class BaseKJob<Config : BaseKJob.Configuration>(val config: Config) : K

internal open val jobExecutors: JobExecutors by lazy { DefaultJobExecutors(config) }
internal open val jobScheduler: JobScheduler by lazy { DefaultJobScheduler(jobRepository) }
internal open val jobRegister: JobRegister by lazy { DefaultJobRegister(config) }
internal open val jobExecutor: JobExecutor by lazy { DefaultJobExecutor(id, jobExecutors.dispatchers, kjobScope.coroutineContext) }
internal open val jobRegister: JobRegister by lazy { DefaultJobRegister() }
internal open val jobExecutor: JobExecutor by lazy { DefaultJobExecutor(id, jobExecutors.dispatchers, clock, kjobScope.coroutineContext) }

private val kjobScope: CoroutineScope by lazy {
CoroutineScope(SupervisorJob() + jobExecutors.executorService.asCoroutineDispatcher() + CoroutineName("kjob[$id]") + handler)
Expand Down Expand Up @@ -129,13 +134,35 @@ abstract class BaseKJob<Config : BaseKJob.Configuration>(val config: Config) : K
return this
}

override fun <J : Job> register(job: J, block: RegisterContext<J>.(J) -> KJobFunctions<J>): KJob {
jobRegister.register(job, block)
@Suppress("UNCHECKED_CAST")
override fun <J : Job> register(job: J, block: JobRegisterContext<J, JobContextWithProps<J>>.(J) -> KJobFunctions<J, JobContextWithProps<J>>): KJob {
val runnableJob = DefaultRunnableJob(job, config, block as JobRegisterContext<J, JobContext<J>>.(J) -> KJobFunctions<J, JobContext<J>>)
jobRegister.register(runnableJob)
return this
}

override suspend fun <J : Job> schedule(job: J, block: ScheduleContext<J>.(J) -> Unit): KJob {
jobScheduler.schedule(job, block)
val ctx = ScheduleContext<J>()
block(ctx, job)
val settings = JobSettings(ctx.jobId, job.name, ctx.props.props)
jobScheduler.schedule(settings)
return this
}

override suspend fun <J : Job> schedule(job: J, delay: java.time.Duration, block: ScheduleContext<J>.(J) -> Unit): KJob {
val ctx = ScheduleContext<J>()
block(ctx, job)
val settings = JobSettings(ctx.jobId, job.name, ctx.props.props)
jobScheduler.schedule(settings, Instant.now(clock).plus(delay))
return this
}

@ExperimentalTime
override suspend fun <J : Job> schedule(job: J, delay: Duration, block: ScheduleContext<J>.(J) -> Unit): KJob {
val ctx = ScheduleContext<J>()
block(ctx, job)
val settings = JobSettings(ctx.jobId, job.name, ctx.props.props)
jobScheduler.schedule(settings, Instant.now(clock).plus(delay.toJavaDuration()))
return this
}

Expand Down
8 changes: 7 additions & 1 deletion kjob-core/src/main/kotlin/it/justwrote/kjob/Job.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package it.justwrote.kjob

abstract class Job(val name: String) {
interface BaseJob {
val name: String
}

abstract class KronJob(override val name: String, val cronExpression: String): BaseJob

abstract class Job(override val name: String): BaseJob {
protected fun <J : Job> J.integer(name: String): Prop<J, Int> = Prop(name)
protected fun <J : Job> J.double(name: String): Prop<J, Double> = Prop(name)
protected fun <J : Job> J.long(name: String): Prop<J, Long> = Prop(name)
Expand Down
23 changes: 20 additions & 3 deletions kjob-core/src/main/kotlin/it/justwrote/kjob/KJob.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package it.justwrote.kjob

import it.justwrote.kjob.dsl.*
import it.justwrote.kjob.dsl.KJobFunctions
import it.justwrote.kjob.dsl.RegisterContext
import it.justwrote.kjob.dsl.ScheduleContext
import it.justwrote.kjob.extension.Extension
import it.justwrote.kjob.extension.ExtensionId
import it.justwrote.kjob.job.JobExecutionType
import kotlin.time.Duration
import kotlin.time.ExperimentalTime

interface KJob {
open class Configuration {
Expand Down Expand Up @@ -46,13 +48,28 @@ interface KJob {
*
* @param job the job to be registered
*/
fun <J : Job> register(job: J, block: RegisterContext<J>.(J) -> KJobFunctions<J>): KJob
fun <J : Job> register(job: J, block: JobRegisterContext<J, JobContextWithProps<J>>.(J) -> KJobFunctions<J, JobContextWithProps<J>>): KJob

/**
* Schedules a new job that will be processed in the background at some point.
* @param job the job that has been registered before
*/
suspend fun <J : Job> schedule(job: J, block: ScheduleContext<J>.(J) -> Unit = {}): KJob

operator fun <Ex : Extension, ExId : ExtensionId<Ex>> invoke(extensionId: ExId): Ex
/**
* Schedules a new job that will be processed in the background at some point.
* @param job the job that has been registered before
* @param delay time to wait until the job will be scheduled
*/
suspend fun <J : Job> schedule(job: J, delay: java.time.Duration, block: ScheduleContext<J>.(J) -> Unit = {}): KJob

/**
* Schedules a new job that will be processed in the background at some point.
* @param job the job that has been registered before
* @param delay time to wait until the job will be scheduled
*/
@ExperimentalTime
suspend fun <J : Job> schedule(job: J, delay: Duration, block: ScheduleContext<J>.(J) -> Unit = {}): KJob

operator fun <Ex: Extension, ExId: ExtensionId<Ex>> invoke(extensionId: ExId): Ex
}
Loading

0 comments on commit 99036f7

Please sign in to comment.