diff --git a/server/src/instance/instance-task.service.ts b/server/src/instance/instance-task.service.ts index e19a29b655..6e4b57fddc 100644 --- a/server/src/instance/instance-task.service.ts +++ b/server/src/instance/instance-task.service.ts @@ -3,7 +3,6 @@ import { Cron, CronExpression } from '@nestjs/schedule' import { Application, ApplicationPhase, ApplicationState } from '@prisma/client' import { isConditionTrue } from '../utils/getter' import { InstanceService } from './instance.service' -import { times } from 'lodash' import { ServerConfig, TASK_LOCK_INIT_TIME } from 'src/constants' import { SystemDatabase } from 'src/database/system-database' import { ClusterService } from 'src/region/cluster/cluster.service' @@ -11,7 +10,6 @@ import { ClusterService } from 'src/region/cluster/cluster.service' @Injectable() export class InstanceTaskService { readonly lockTimeout = 60 // in second - readonly concurrency = 1 // concurrency count private readonly logger = new Logger(InstanceTaskService.name) constructor( @@ -26,22 +24,40 @@ export class InstanceTaskService { } // Phase `Created` | `Stopped` -> `Starting` - times(this.concurrency, () => this.handleRunningState()) + this.handleRunningState().catch((err) => { + this.logger.error('handleRunningState error', err) + err?.response && this.logger.debug(err?.response?.data || err?.response) + }) // Phase `Starting` -> `Started` - times(this.concurrency, () => this.handleStartingPhase()) + this.handleStartingPhase().catch((err) => { + this.logger.error('handleStartingPhase error', err) + err?.response && this.logger.debug(err?.response?.data || err?.response) + }) // Phase `Started` -> `Stopping` - times(this.concurrency, () => this.handleStoppedState()) + this.handleStoppedState().catch((err) => { + this.logger.error('handleStoppedState error', err) + err?.response && this.logger.debug(err?.response?.data || err?.response) + }) // Phase `Stopping` -> `Stopped` - times(this.concurrency, () => this.handleStoppingPhase()) + this.handleStoppingPhase().catch((err) => { + this.logger.error('handleStoppingPhase error', err) + err?.response && this.logger.debug(err?.response?.data || err?.response) + }) // Phase `Started` -> `Stopping` - times(this.concurrency, () => this.handleRestartingStateDown()) + this.handleRestartingStateDown().catch((err) => { + this.logger.error('handleRestartingStateDown error', err) + err?.response && this.logger.debug(err?.response?.data || err?.response) + }) // Phase `Stopped` -> `Starting` - times(this.concurrency, () => this.handleRestartingStateUp()) + this.handleRestartingStateUp().catch((err) => { + this.logger.error('handleRestartingStateUp error', err) + err?.response && this.logger.debug(err?.response?.data || err?.response) + }) } /** @@ -57,49 +73,34 @@ export class InstanceTaskService { .findOneAndUpdate( { state: ApplicationState.Running, - phase: { - $in: [ApplicationPhase.Created, ApplicationPhase.Stopped], - }, - lockedAt: { - $lt: new Date(Date.now() - 1000 * this.lockTimeout), - }, - }, - { - $set: { - lockedAt: new Date(), - }, + phase: { $in: [ApplicationPhase.Created, ApplicationPhase.Stopped] }, + lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) }, }, + { $set: { lockedAt: new Date() } }, ) if (!res.value) return const app = res.value - try { - // create instance - await this.instanceService.create(app) - - // update phase to `Starting` - const updated = await db.collection('Application').updateOne( - { - appid: app.appid, - phase: { - $in: [ApplicationPhase.Created, ApplicationPhase.Stopped], - }, - }, - { - $set: { - phase: ApplicationPhase.Starting, - lockedAt: TASK_LOCK_INIT_TIME, - updatedAt: new Date(), - }, + // create instance + await this.instanceService.create(app) + + // update phase to `Starting` + await db.collection('Application').updateOne( + { + appid: app.appid, + phase: { $in: [ApplicationPhase.Created, ApplicationPhase.Stopped] }, + }, + { + $set: { + phase: ApplicationPhase.Starting, + lockedAt: TASK_LOCK_INIT_TIME, + updatedAt: new Date(), }, - ) + }, + ) - if (updated.modifiedCount > 0) - this.logger.log(`Application ${app.appid} updated to phase starting`) - } catch (error) { - this.logger.error(error, error.response?.body) - } + this.logger.log(`Application ${app.appid} updated to phase starting`) } /** @@ -115,13 +116,9 @@ export class InstanceTaskService { .findOneAndUpdate( { phase: ApplicationPhase.Starting, - lockedAt: { - $lt: new Date(Date.now() - 1000 * this.lockTimeout), - }, - }, - { - $set: { lockedAt: new Date() }, + lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) }, }, + { $set: { lockedAt: new Date() } }, ) if (!res.value) return @@ -129,38 +126,16 @@ export class InstanceTaskService { const waitingTime = Date.now() - app.updatedAt.getTime() - try { - const appid = app.appid - const instance = await this.instanceService.get(app) - const available = isConditionTrue( - 'Available', - instance.deployment.status?.conditions, - ) - if (!available) { - await this.relock(appid, waitingTime) - return - } - - if (!instance.service) { - await this.relock(appid, waitingTime) - return - } - - // if state is `Restarting`, update state to `Running` with phase `Started` - let toState = app.state - if (app.state === ApplicationState.Restarting) { - toState = ApplicationState.Running - } - - // update application state - const updated = await db.collection('Application').updateOne( + // if waiting time is more than 5 minutes, stop the application + if (waitingTime > 1000 * 60 * 5) { + await db.collection('Application').updateOne( { - appid, + appid: app.appid, phase: ApplicationPhase.Starting, }, { $set: { - state: toState, + state: ApplicationState.Stopped, phase: ApplicationPhase.Started, lockedAt: TASK_LOCK_INIT_TIME, updatedAt: new Date(), @@ -168,11 +143,48 @@ export class InstanceTaskService { }, ) - if (updated.modifiedCount > 0) - this.logger.debug(`Application ${app.appid} updated to phase started`) - } catch (error) { - this.logger.error(error) + this.logger.debug( + `Application ${app.appid} updated to state Stopped due to timeout`, + ) + return + } + + const appid = app.appid + const instance = await this.instanceService.get(app) + const available = isConditionTrue( + 'Available', + instance.deployment.status?.conditions, + ) + if (!available) { + await this.relock(appid, waitingTime) + return + } + + if (!instance.service) { + await this.relock(appid, waitingTime) + return + } + + // if state is `Restarting`, update state to `Running` with phase `Started` + let toState = app.state + if (app.state === ApplicationState.Restarting) { + toState = ApplicationState.Running } + + // update application state + await db.collection('Application').updateOne( + { appid, phase: ApplicationPhase.Starting }, + { + $set: { + state: toState, + phase: ApplicationPhase.Started, + lockedAt: TASK_LOCK_INIT_TIME, + updatedAt: new Date(), + }, + }, + ) + + this.logger.debug(`Application ${app.appid} updated to phase started`) } /** @@ -189,44 +201,33 @@ export class InstanceTaskService { { state: ApplicationState.Stopped, phase: ApplicationPhase.Started, - lockedAt: { - $lt: new Date(Date.now() - 1000 * this.lockTimeout), - }, - }, - { - $set: { - lockedAt: new Date(), - }, + lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) }, }, + { $set: { lockedAt: new Date() } }, ) if (!res.value) return const app = res.value - try { - // remove instance - await this.instanceService.remove(app) - - // update phase to `Stopping` - const updated = await db.collection('Application').updateOne( - { - appid: app.appid, - phase: ApplicationPhase.Started, - }, - { - $set: { - phase: ApplicationPhase.Stopping, - lockedAt: TASK_LOCK_INIT_TIME, - updatedAt: new Date(), - }, + // remove instance + await this.instanceService.remove(app) + + // update phase to `Stopping` + await db.collection('Application').updateOne( + { + appid: app.appid, + phase: ApplicationPhase.Started, + }, + { + $set: { + phase: ApplicationPhase.Stopping, + lockedAt: TASK_LOCK_INIT_TIME, + updatedAt: new Date(), }, - ) + }, + ) - if (updated.modifiedCount > 0) - this.logger.debug(`Application ${app.appid} updated to phase stopping`) - } catch (error) { - this.logger.error(error) - } + this.logger.log(`Application ${app.appid} updated to phase stopping`) } /** @@ -242,51 +243,37 @@ export class InstanceTaskService { .findOneAndUpdate( { phase: ApplicationPhase.Stopping, - lockedAt: { - $lt: new Date(Date.now() - 1000 * this.lockTimeout), - }, - }, - { - $set: { - lockedAt: new Date(), - }, + lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) }, }, + { $set: { lockedAt: new Date() } }, ) if (!res.value) return const app = res.value const appid = app.appid - try { - const waitingTime = Date.now() - app.updatedAt.getTime() + const waitingTime = Date.now() - app.updatedAt.getTime() - // check if the instance is removed - const instance = await this.instanceService.get(app) - if (instance.deployment) { - await this.relock(appid, waitingTime) - return - } + // check if the instance is removed + const instance = await this.instanceService.get(app) + if (instance.deployment) { + await this.relock(appid, waitingTime) + return + } - // update application phase to `Stopped` - const updated = await db.collection('Application').updateOne( - { - appid, - phase: ApplicationPhase.Stopping, - }, - { - $set: { - phase: ApplicationPhase.Stopped, - lockedAt: TASK_LOCK_INIT_TIME, - updatedAt: new Date(), - }, + // update application phase to `Stopped` + await db.collection('Application').updateOne( + { appid, phase: ApplicationPhase.Stopping }, + { + $set: { + phase: ApplicationPhase.Stopped, + lockedAt: TASK_LOCK_INIT_TIME, + updatedAt: new Date(), }, - ) + }, + ) - if (updated.modifiedCount > 0) - this.logger.debug(`Application ${app.appid} updated to phase stopped`) - } catch (error) { - this.logger.error(error) - } + this.logger.log(`Application ${app.appid} updated to phase Stopped`) } /** @@ -303,44 +290,30 @@ export class InstanceTaskService { { state: ApplicationState.Restarting, phase: ApplicationPhase.Started, - lockedAt: { - $lt: new Date(Date.now() - 1000 * this.lockTimeout), - }, - }, - { - $set: { - lockedAt: new Date(), - }, + lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) }, }, + { $set: { lockedAt: new Date() } }, ) if (!res.value) return const app = res.value - try { - // remove instance - await this.instanceService.remove(app) + // remove instance + await this.instanceService.remove(app) - // update phase to `Stopping` - const updated = await db.collection('Application').updateOne( - { - appid: app.appid, - phase: ApplicationPhase.Started, - }, - { - $set: { - phase: ApplicationPhase.Stopping, - lockedAt: TASK_LOCK_INIT_TIME, - updatedAt: new Date(), - }, + // update phase to `Stopping` + await db.collection('Application').updateOne( + { appid: app.appid, phase: ApplicationPhase.Started }, + { + $set: { + phase: ApplicationPhase.Stopping, + lockedAt: TASK_LOCK_INIT_TIME, + updatedAt: new Date(), }, - ) + }, + ) - if (updated.modifiedCount > 0) - this.logger.debug(`${app.appid} updated to stopping for restarting`) - } catch (error) { - this.logger.error(error) - } + this.logger.debug(`${app.appid} updated to stopping for restarting`) } /** @@ -357,50 +330,41 @@ export class InstanceTaskService { { state: ApplicationState.Restarting, phase: ApplicationPhase.Stopped, - lockedAt: { - $lt: new Date(Date.now() - 1000 * this.lockTimeout), - }, - }, - { - $set: { - lockedAt: new Date(), - }, + lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) }, }, + { $set: { lockedAt: new Date() } }, ) if (!res.value) return const app = res.value - try { - // create instance - await this.instanceService.create(app) + // create instance + await this.instanceService.create(app) - // update phase to `Starting` - const updated = await db.collection('Application').updateOne( - { - appid: app.appid, - phase: ApplicationPhase.Stopped, - }, - { - $set: { - phase: ApplicationPhase.Starting, - lockedAt: TASK_LOCK_INIT_TIME, - updatedAt: new Date(), - }, + // update phase to `Starting` + await db.collection('Application').updateOne( + { appid: app.appid, phase: ApplicationPhase.Stopped }, + { + $set: { + phase: ApplicationPhase.Starting, + lockedAt: TASK_LOCK_INIT_TIME, + updatedAt: new Date(), }, - ) + }, + ) - if (updated.modifiedCount > 0) - this.logger.debug(`${app.appid} updated starting for restarting`) - } catch (error) { - this.logger.error(error) - } + this.logger.debug(`${app.appid} updated starting for restarting`) } /** * Relock application by appid, lockedTime is in milliseconds */ async relock(appid: string, lockedTime = 0) { + // if lockedTime greater than 5 minutes, set it to 10 minutes + if (lockedTime > 5 * 60 * 1000) { + lockedTime = 5 * 60 * 1000 + } + const db = SystemDatabase.db const lockedAt = new Date(Date.now() - 1000 * this.lockTimeout + lockedTime) await db diff --git a/server/src/trigger/cron-job.service.ts b/server/src/trigger/cron-job.service.ts index 93502f1f70..47c8d37ed0 100644 --- a/server/src/trigger/cron-job.service.ts +++ b/server/src/trigger/cron-job.service.ts @@ -30,47 +30,42 @@ export class CronJobService { const batchApi = this.clusterService.makeBatchV1Api(region) const name = `cron-${trigger.id}` const command = await this.getTriggerCommand(trigger) - const res = await batchApi - .createNamespacedCronJob(ns, { - metadata: { - name, - labels: { - appid, - id: trigger.id, - }, + const res = await batchApi.createNamespacedCronJob(ns, { + metadata: { + name, + labels: { + appid, + id: trigger.id, }, - spec: { - schedule: trigger.cron, - successfulJobsHistoryLimit: 1, - failedJobsHistoryLimit: 1, - concurrencyPolicy: 'Allow', - startingDeadlineSeconds: 60, - jobTemplate: { - spec: { - activeDeadlineSeconds: 60, - template: { - spec: { - restartPolicy: 'Never', - terminationGracePeriodSeconds: 30, - automountServiceAccountToken: false, - containers: [ - { - name: name, - image: 'curlimages/curl:7.87.0', - command: ['sh', '-c', command], - imagePullPolicy: 'IfNotPresent', - }, - ], - }, + }, + spec: { + schedule: trigger.cron, + successfulJobsHistoryLimit: 1, + failedJobsHistoryLimit: 1, + concurrencyPolicy: 'Allow', + startingDeadlineSeconds: 60, + jobTemplate: { + spec: { + activeDeadlineSeconds: 60, + template: { + spec: { + restartPolicy: 'Never', + terminationGracePeriodSeconds: 30, + automountServiceAccountToken: false, + containers: [ + { + name: name, + image: 'curlimages/curl:7.87.0', + command: ['sh', '-c', command], + imagePullPolicy: 'IfNotPresent', + }, + ], }, }, }, }, - }) - .catch((err) => { - this.logger.error(`create cronjob ${name} failed:`, err) - return null - }) + }, + }) this.logger.debug(`create cronjob ${name} success`) return res.body @@ -80,13 +75,17 @@ export class CronJobService { const appid = trigger.appid const ns = GetApplicationNamespaceByAppId(appid) const region = await this.regionService.findByAppId(appid) - const batchApi = this.clusterService.makeBatchV1Api(region) - const name = `cron-${trigger.id}` - const res = await batchApi.readNamespacedCronJob(name, ns).catch((err) => { - this.logger.error(`read cronjob ${name} failed:`, err) - return null - }) - return res.body + try { + const batchApi = this.clusterService.makeBatchV1Api(region) + const name = `cron-${trigger.id}` + const res = await batchApi.readNamespacedCronJob(name, ns) + return res.body + } catch (err) { + if (err?.response?.body?.reason === 'NotFound') return null + this.logger.error(err) + this.logger.error(err?.response?.body) + throw err + } } async delete(trigger: CronTrigger) { @@ -95,12 +94,7 @@ export class CronJobService { const region = await this.regionService.findByAppId(appid) const batchApi = this.clusterService.makeBatchV1Api(region) const name = `cron-${trigger.id}` - const res = await batchApi - .deleteNamespacedCronJob(name, ns) - .catch((err) => { - this.logger.error(`delete cronjob ${name} failed:`, err) - return null - }) + const res = await batchApi.deleteNamespacedCronJob(name, ns) return res.body } diff --git a/server/src/trigger/trigger-task.service.ts b/server/src/trigger/trigger-task.service.ts index be7ee1879b..23c70068f8 100644 --- a/server/src/trigger/trigger-task.service.ts +++ b/server/src/trigger/trigger-task.service.ts @@ -1,6 +1,5 @@ import { Injectable, Logger } from '@nestjs/common' import { Cron, CronExpression } from '@nestjs/schedule' -import { times } from 'lodash' import { TASK_LOCK_INIT_TIME } from 'src/constants' import { SystemDatabase } from 'src/database/system-database' import { CronJobService } from './cron-job.service' @@ -18,22 +17,29 @@ export class TriggerTaskService { @Cron(CronExpression.EVERY_SECOND) async tick() { // Phase `Creating` -> `Created` - times(this.concurrency, () => this.handleCreatingPhase()) + this.handleCreatingPhase().catch((err) => { + this.logger.error('handleCreatingPhase error: ' + err) + }) // Phase `Deleting` -> `Deleted` - times(this.concurrency, () => this.handleDeletingPhase()) + this.handleDeletingPhase().catch((err) => { + this.logger.error('handleDeletingPhase error: ' + err) + }) // Phase `Created` -> `Deleting` - this.handleInactiveState() + this.handleInactiveState().catch((err) => { + this.logger.error('handleInactiveState error: ' + err) + }) // Phase `Deleted` -> `Creating` - this.handleActiveState() + this.handleActiveState().catch((err) => { + this.logger.error('handleActiveState error: ' + err) + }) // Phase `Deleting` -> `Deleted` - this.handleDeletedState() - - // Clear timeout locks - this.clearTimeoutLocks() + this.handleDeletedState().catch((err) => { + this.logger.error('handleDeletedState error: ' + err) + }) } /** @@ -49,15 +55,9 @@ export class TriggerTaskService { .findOneAndUpdate( { phase: TriggerPhase.Creating, - lockedAt: { - $lt: new Date(Date.now() - 1000 * this.lockTimeout), - }, - }, - { - $set: { - lockedAt: new Date(), - }, + lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) }, }, + { $set: { lockedAt: new Date() } }, ) if (!res.value) return @@ -67,28 +67,22 @@ export class TriggerTaskService { id: res.value._id.toString(), } - // create cron job - const job = await this.cronService.create(doc) - if (!job) return - - this.logger.debug('cron job created: ' + doc._id) + // create cron job if not exists + const job = await this.cronService.findOne(doc) + if (!job) { + await this.cronService.create(doc) + this.logger.log('cron job created: ' + doc._id) + } // update phase to `Created` - const updated = await db.collection('CronTrigger').updateOne( - { - _id: doc._id, - phase: TriggerPhase.Creating, - }, + await db.collection('CronTrigger').updateOne( + { _id: doc._id, phase: TriggerPhase.Creating }, { - $set: { - phase: TriggerPhase.Created, - lockedAt: TASK_LOCK_INIT_TIME, - }, + $set: { phase: TriggerPhase.Created, lockedAt: TASK_LOCK_INIT_TIME }, }, ) - if (updated.modifiedCount > 0) - this.logger.debug('trigger phase updated to Created ', doc._id) + this.logger.log('trigger phase updated to Created: ' + doc._id) } /** @@ -104,15 +98,9 @@ export class TriggerTaskService { .findOneAndUpdate( { phase: TriggerPhase.Deleting, - lockedAt: { - $lt: new Date(Date.now() - 1000 * this.lockTimeout), - }, - }, - { - $set: { - lockedAt: new Date(), - }, + lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) }, }, + { $set: { lockedAt: new Date() } }, ) if (!res.value) return @@ -122,28 +110,22 @@ export class TriggerTaskService { id: res.value._id.toString(), } - // delete cron job - const ret = await this.cronService.delete(doc) - if (ret?.status !== 'Success') return - - this.logger.debug('cron job deleted:', doc._id) + // delete cron job if exists + const job = await this.cronService.findOne(doc) + if (job) { + await this.cronService.delete(doc) + this.logger.log('cron job deleted: ' + doc._id) + } // update phase to `Deleted` - const updated = await db.collection('CronTrigger').updateOne( - { - _id: doc._id, - phase: TriggerPhase.Deleting, - }, + await db.collection('CronTrigger').updateOne( + { _id: doc._id, phase: TriggerPhase.Deleting }, { - $set: { - phase: TriggerPhase.Deleted, - lockedAt: TASK_LOCK_INIT_TIME, - }, + $set: { phase: TriggerPhase.Deleted, lockedAt: TASK_LOCK_INIT_TIME }, }, ) - if (updated.modifiedCount > 0) - this.logger.debug('cron trigger phase updated to Deleted ' + doc._id) + this.logger.debug('cron trigger phase updated to Deleted: ' + doc._id) } /** @@ -154,15 +136,9 @@ export class TriggerTaskService { const db = SystemDatabase.db await db.collection('CronTrigger').updateMany( + { state: TriggerState.Active, phase: TriggerPhase.Deleted }, { - state: TriggerState.Active, - phase: TriggerPhase.Deleted, - }, - { - $set: { - phase: TriggerPhase.Creating, - lockedAt: TASK_LOCK_INIT_TIME, - }, + $set: { phase: TriggerPhase.Creating, lockedAt: TASK_LOCK_INIT_TIME }, }, ) } @@ -175,15 +151,9 @@ export class TriggerTaskService { const db = SystemDatabase.db await db.collection('CronTrigger').updateMany( + { state: TriggerState.Inactive, phase: TriggerPhase.Created }, { - state: TriggerState.Inactive, - phase: TriggerPhase.Created, - }, - { - $set: { - phase: TriggerPhase.Deleting, - lockedAt: TASK_LOCK_INIT_TIME, - }, + $set: { phase: TriggerPhase.Deleting, lockedAt: TASK_LOCK_INIT_TIME }, }, ) } @@ -197,41 +167,14 @@ export class TriggerTaskService { const db = SystemDatabase.db await db.collection('CronTrigger').updateMany( + { state: TriggerState.Deleted, phase: TriggerPhase.Created }, { - state: TriggerState.Deleted, - phase: TriggerPhase.Created, - }, - { - $set: { - phase: TriggerPhase.Deleting, - lockedAt: TASK_LOCK_INIT_TIME, - }, + $set: { phase: TriggerPhase.Deleting, lockedAt: TASK_LOCK_INIT_TIME }, }, ) - await db.collection('CronTrigger').deleteMany({ - state: TriggerState.Deleted, - phase: TriggerPhase.Deleted, - }) - } - - /** - * Clear timeout locks - */ - async clearTimeoutLocks() { - const db = SystemDatabase.db - - await db.collection('CronTrigger').updateMany( - { - lockedAt: { - $lt: new Date(Date.now() - 1000 * this.lockTimeout), - }, - }, - { - $set: { - lockedAt: TASK_LOCK_INIT_TIME, - }, - }, - ) + await db + .collection('CronTrigger') + .deleteMany({ state: TriggerState.Deleted, phase: TriggerPhase.Deleted }) } }