Skip to content

Commit

Permalink
fix(server): suspend cronjob after instance stopped (#1045)
Browse files Browse the repository at this point in the history
  • Loading branch information
maslow committed Apr 19, 2023
1 parent a099568 commit 8d63403
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 13 deletions.
1 change: 1 addition & 0 deletions packages/cloud-sdk/src/cloud.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export interface CloudSdkInterface {

/**
* Invoke cloud function
* @deprecated Just import the cloud function directly, and then call it
*/
invoke: InvokeFunctionType

Expand Down
28 changes: 17 additions & 11 deletions server/src/instance/instance-task.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import { isConditionTrue } from '../utils/getter'
import { InstanceService } from './instance.service'
import { ServerConfig, TASK_LOCK_INIT_TIME } from 'src/constants'
import { SystemDatabase } from 'src/database/system-database'
import { ClusterService } from 'src/region/cluster/cluster.service'
import { CronJobService } from 'src/trigger/cron-job.service'

@Injectable()
export class InstanceTaskService {
readonly lockTimeout = 60 // in second
readonly lockTimeout = 10 // in second
private readonly logger = new Logger(InstanceTaskService.name)

constructor(
private readonly clusterService: ClusterService,
private readonly instanceService: InstanceService,
private readonly cronService: CronJobService,
) {}

@Cron(CronExpression.EVERY_SECOND)
Expand All @@ -26,37 +26,37 @@ export class InstanceTaskService {
// Phase `Created` | `Stopped` -> `Starting`
this.handleRunningState().catch((err) => {
this.logger.error('handleRunningState error', err)
err?.response && this.logger.debug(err?.response?.data || err?.response)
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
})

// Phase `Starting` -> `Started`
this.handleStartingPhase().catch((err) => {
this.logger.error('handleStartingPhase error', err)
err?.response && this.logger.debug(err?.response?.data || err?.response)
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
})

// Phase `Started` -> `Stopping`
this.handleStoppedState().catch((err) => {
this.logger.error('handleStoppedState error', err)
err?.response && this.logger.debug(err?.response?.data || err?.response)
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
})

// Phase `Stopping` -> `Stopped`
this.handleStoppingPhase().catch((err) => {
this.logger.error('handleStoppingPhase error', err)
err?.response && this.logger.debug(err?.response?.data || err?.response)
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
})

// Phase `Started` -> `Stopping`
this.handleRestartingStateDown().catch((err) => {
this.logger.error('handleRestartingStateDown error', err)
err?.response && this.logger.debug(err?.response?.data || err?.response)
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
})

// Phase `Stopped` -> `Starting`
this.handleRestartingStateUp().catch((err) => {
this.logger.error('handleRestartingStateUp error', err)
err?.response && this.logger.debug(err?.response?.data || err?.response)
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
})
}

Expand Down Expand Up @@ -133,15 +133,18 @@ export class InstanceTaskService {
instance.deployment?.status?.conditions || [],
)
if (!available) {
await this.relock(appid, waitingTime)
await this.relock(appid)
return
}

if (!instance.service) {
await this.relock(appid, waitingTime)
await this.relock(appid)
return
}

// resume cronjobs if any
await this.cronService.resumeAll(app.appid)

// if state is `Restarting`, update state to `Running` with phase `Started`
let toState = app.state
if (app.state === ApplicationState.Restarting) {
Expand Down Expand Up @@ -226,6 +229,9 @@ export class InstanceTaskService {
return
}

// suspend cronjobs if any
await this.cronService.suspendAll(app.appid)

// update application phase to `Stopped`
await db.collection<Application>('Application').updateOne(
{ appid, phase: ApplicationPhase.Stopping },
Expand Down
3 changes: 2 additions & 1 deletion server/src/instance/instance.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import { InstanceService } from './instance.service'
import { InstanceTaskService } from './instance-task.service'
import { StorageModule } from '../storage/storage.module'
import { DatabaseModule } from '../database/database.module'
import { TriggerModule } from 'src/trigger/trigger.module'

@Module({
imports: [StorageModule, DatabaseModule],
imports: [StorageModule, DatabaseModule, TriggerModule],
providers: [InstanceService, InstanceTaskService],
})
export class InstanceModule {}
60 changes: 59 additions & 1 deletion server/src/trigger/cron-job.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { Injectable, Logger } from '@nestjs/common'
import { CronTrigger } from '@prisma/client'
import { CronTrigger, TriggerPhase } from '@prisma/client'
import { ClusterService } from 'src/region/cluster/cluster.service'
import * as assert from 'node:assert'
import { RegionService } from 'src/region/region.service'
import { GetApplicationNamespaceByAppId } from 'src/utils/getter'
import { FunctionService } from 'src/function/function.service'
import { FOREVER_IN_SECONDS, X_LAF_TRIGGER_TOKEN_KEY } from 'src/constants'
import { TriggerService } from './trigger.service'
import * as k8s from '@kubernetes/client-node'

@Injectable()
export class CronJobService {
Expand All @@ -15,6 +17,7 @@ export class CronJobService {
private readonly clusterService: ClusterService,
private readonly regionService: RegionService,
private readonly funcService: FunctionService,
private readonly triggerService: TriggerService,
) {}

async create(trigger: CronTrigger) {
Expand Down Expand Up @@ -42,6 +45,7 @@ export class CronJobService {
schedule: trigger.cron,
successfulJobsHistoryLimit: 1,
failedJobsHistoryLimit: 1,
suspend: false,
concurrencyPolicy: 'Allow',
startingDeadlineSeconds: 60,
jobTemplate: {
Expand Down Expand Up @@ -88,6 +92,32 @@ export class CronJobService {
}
}

async suspend(trigger: CronTrigger) {
return await this.patchSuspend(trigger, true)
}

async resume(trigger: CronTrigger) {
return await this.patchSuspend(trigger, false)
}

async suspendAll(appid: string) {
const triggers = await this.triggerService.findAll(appid)
for (const trigger of triggers) {
if (trigger.phase !== TriggerPhase.Created) continue
await this.suspend(trigger)
this.logger.log(`suspend cronjob ${trigger.id} success of ${appid}`)
}
}

async resumeAll(appid: string) {
const triggers = await this.triggerService.findAll(appid)
for (const trigger of triggers) {
if (trigger.phase !== TriggerPhase.Created) continue
await this.resume(trigger)
this.logger.log(`resume cronjob ${trigger.id} success of ${appid}`)
}
}

async delete(trigger: CronTrigger) {
const appid = trigger.appid
const ns = GetApplicationNamespaceByAppId(appid)
Expand All @@ -114,4 +144,32 @@ export class CronJobService {
const command = `curl -X POST -H "${X_LAF_TRIGGER_TOKEN_KEY}: ${token}" ${invokeUrl}`
return command
}

private async patchSuspend(trigger: CronTrigger, suspend: boolean) {
const appid = trigger.appid
const ns = GetApplicationNamespaceByAppId(appid)
const region = await this.regionService.findByAppId(appid)
const batchApi = this.clusterService.makeBatchV1Api(region)
const name = `cron-${trigger.id}`
const body = [{ op: 'replace', path: '/spec/suspend', value: suspend }]
try {
const res = await batchApi.patchNamespacedCronJob(
name,
ns,
body,
undefined,
undefined,
undefined,
undefined,
undefined,
{
headers: { 'Content-Type': k8s.PatchUtils.PATCH_FORMAT_JSON_PATCH },
},
)
return res.body
} catch (err) {
if (err?.response?.body?.reason === 'NotFound') return null
throw err
}
}
}
1 change: 1 addition & 0 deletions server/src/trigger/trigger.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ import { MongoService } from 'src/database/mongo.service'
DatabaseService,
MongoService,
],
exports: [TriggerService, CronJobService],
})
export class TriggerModule {}

0 comments on commit 8d63403

Please sign in to comment.