-
Notifications
You must be signed in to change notification settings - Fork 568
/
framework.ts
127 lines (117 loc) · 3.54 KB
/
framework.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import {
getClassMetadata,
listModule,
listPropertyDataFromClass,
MidwayFrameworkType,
ConsumerMetadata,
MS_CONSUMER_KEY,
MSListenerType,
RabbitMQListenerOptions,
Framework,
BaseFramework,
MidwayInvokeForbiddenError,
} from '@midwayjs/core';
import {
IMidwayRabbitMQApplication,
IMidwayRabbitMQConfigurationOptions,
IMidwayRabbitMQContext,
} from './interface';
import { RabbitMQServer } from './mq';
import { ConsumeMessage } from 'amqplib';
@Framework()
export class MidwayRabbitMQFramework extends BaseFramework<
IMidwayRabbitMQApplication,
IMidwayRabbitMQContext,
IMidwayRabbitMQConfigurationOptions
> {
public app: IMidwayRabbitMQApplication;
public consumerHandlerList = [];
configure() {
return this.configService.getConfiguration('rabbitmq');
}
async applicationInitialize(options) {
// Create a connection manager
this.app = new RabbitMQServer({
logger: this.logger,
...this.configurationOptions,
}) as unknown as IMidwayRabbitMQApplication;
}
public async run(): Promise<void> {
try {
// init connection
await this.app.connect(
this.configurationOptions.url,
this.configurationOptions.socketOptions
);
await this.loadSubscriber();
this.logger.info('Rabbitmq server start success');
} catch (error) {
this.app.close();
throw error;
}
}
protected async beforeStop(): Promise<void> {
await this.app.close();
}
public getFrameworkType(): MidwayFrameworkType {
return MidwayFrameworkType.MS_RABBITMQ;
}
private async loadSubscriber() {
// create channel
const subscriberModules = listModule(MS_CONSUMER_KEY, module => {
const metadata: ConsumerMetadata.ConsumerMetadata = getClassMetadata(
MS_CONSUMER_KEY,
module
);
return metadata.type === MSListenerType.RABBITMQ;
});
for (const module of subscriberModules) {
const data: RabbitMQListenerOptions[][] = listPropertyDataFromClass(
MS_CONSUMER_KEY,
module
);
for (const methodBindListeners of data) {
// 循环绑定的方法和监听的配置信息
for (const listenerOptions of methodBindListeners) {
await this.app.createConsumer(
listenerOptions,
async (data: ConsumeMessage, channel, channelWrapper) => {
const ctx = {
channel,
queueName: listenerOptions.queueName,
ack: data => {
return channelWrapper.ack(data);
},
} as IMidwayRabbitMQContext;
this.app.createAnonymousContext(ctx);
const isPassed = await this.app
.getFramework()
.runGuard(ctx, module, listenerOptions.propertyKey);
if (!isPassed) {
throw new MidwayInvokeForbiddenError(
listenerOptions.propertyKey,
module
);
}
const ins = await ctx.requestContext.getAsync(module);
const fn = await this.applyMiddleware(async ctx => {
return await ins[listenerOptions.propertyKey].call(ins, data);
});
try {
const result = await fn(ctx);
if (result) {
return channelWrapper.ack(data);
}
} catch (error) {
this.logger.error(error);
}
}
);
}
}
}
}
public getFrameworkName() {
return 'midway:rabbitmq';
}
}