Skip to content

Commit

Permalink
fix: #1692 (#1696)
Browse files Browse the repository at this point in the history
  • Loading branch information
czy88840616 committed Feb 13, 2022
1 parent d93f774 commit a3ac74a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 24 deletions.
14 changes: 12 additions & 2 deletions packages/task/src/configuration.ts
@@ -1,6 +1,7 @@
import { Configuration } from '@midwayjs/decorator';
import { Configuration, Inject } from '@midwayjs/decorator';
import { ILifeCycle } from '@midwayjs/core';
import * as DefaultConfig from './config/config.default';
import { TaskFramework } from './framework';

@Configuration({
namespace: 'task',
Expand All @@ -10,4 +11,13 @@ import * as DefaultConfig from './config/config.default';
},
],
})
export class TaskConfiguration implements ILifeCycle {}
export class TaskConfiguration implements ILifeCycle {
@Inject()
framework: TaskFramework;

async onReady() {
await this.framework.loadTask();
await this.framework.loadLocalTask();
await this.framework.loadQueue();
}
}
29 changes: 7 additions & 22 deletions packages/task/src/framework.ts
Expand Up @@ -2,7 +2,6 @@ import { BaseFramework, IMidwayBootstrapOptions } from '@midwayjs/core';
import {
Framework,
getClassMetadata,
Types,
listModule,
MidwayFrameworkType,
MODULE_TASK_KEY,
Expand All @@ -18,19 +17,6 @@ import { CronJob } from 'cron';
import { Application, Context, IQueue } from './interface';
import { deprecatedOutput } from '@midwayjs/core';

function wrapAsync(fn) {
return async function (...args) {
if (Types.isAsyncFunction(fn)) {
await fn.call(...args);
} else {
const result = fn.call(...args);
if (result && result.then) {
await result;
}
}
};
}

@Framework()
export class TaskFramework extends BaseFramework<Application, Context, any> {
queueList: any[] = [];
Expand All @@ -48,11 +34,7 @@ export class TaskFramework extends BaseFramework<Application, Context, any> {
return MidwayFrameworkType.TASK;
}

async run() {
await this.loadTask();
await this.loadLocalTask();
await this.loadQueue();
}
async run() {}

async loadTask() {
const legacyConfig = this.configService.getConfiguration('taskConfig');
Expand Down Expand Up @@ -81,7 +63,7 @@ export class TaskFramework extends BaseFramework<Application, Context, any> {
try {
logger.info('task start.');
const service = await ctx.requestContext.getAsync(module);
await wrapAsync(rule.value)(service, job.data);
await Utils.toAsyncFunction(rule.value.bind(service))(job.data);
} catch (e) {
logger.error(`${e.stack}`);
}
Expand Down Expand Up @@ -129,7 +111,7 @@ export class TaskFramework extends BaseFramework<Application, Context, any> {
try {
const service = await ctx.requestContext.getAsync(module);
logger.info('local task start.');
await wrapAsync(rule.value)(service);
await Utils.toAsyncFunction(rule.value.bind(service))();
} catch (err) {
logger.error(err);
}
Expand Down Expand Up @@ -167,7 +149,10 @@ export class TaskFramework extends BaseFramework<Application, Context, any> {
try {
logger.info('queue process start.');
const service = await ctx.requestContext.getAsync<IQueue>(module);
await wrapAsync(service.execute)(service, job.data, job);
await Utils.toAsyncFunction(service.execute.bind(service))(
job.data,
job
);
} catch (e) {
logger.error(`${e.stack}`);
}
Expand Down

0 comments on commit a3ac74a

Please sign in to comment.