-
-
Notifications
You must be signed in to change notification settings - Fork 7.4k
/
rmq.controller.ts
89 lines (76 loc) 路 2.16 KB
/
rmq.controller.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import { Body, Controller, HttpCode, Post, Query } from '@nestjs/common';
import {
ClientProxy,
ClientProxyFactory,
EventPattern,
MessagePattern,
Transport,
} from '@nestjs/microservices';
import { from, Observable, of } from 'rxjs';
import { scan } from 'rxjs/operators';
@Controller()
export class RMQController {
static IS_NOTIFIED = false;
client: ClientProxy;
constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'test',
queueOptions: { durable: false },
socketOptions: { noDelay: true },
},
});
}
@Post()
@HttpCode(200)
call(@Query('command') cmd, @Body() data: number[]) {
return this.client.send<number>({ cmd }, data);
}
@Post('stream')
@HttpCode(200)
stream(@Body() data: number[]): Observable<number> {
return this.client
.send<number>({ cmd: 'streaming' }, data)
.pipe(scan((a, b) => a + b));
}
@Post('concurrent')
@HttpCode(200)
concurrent(@Body() data: number[][]): Promise<boolean> {
const send = async (tab: number[]) => {
const expected = tab.reduce((a, b) => a + b);
const result = await this.client
.send<number>({ cmd: 'sum' }, tab)
.toPromise();
return result === expected;
};
return data
.map(async tab => send(tab))
.reduce(async (a, b) => (await a) && b);
}
@MessagePattern({ cmd: 'sum' })
sum(data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}
@MessagePattern({ cmd: 'asyncSum' })
async asyncSum(data: number[]): Promise<number> {
return (data || []).reduce((a, b) => a + b);
}
@MessagePattern({ cmd: 'streamSum' })
streamSum(data: number[]): Observable<number> {
return of((data || []).reduce((a, b) => a + b));
}
@MessagePattern({ cmd: 'streaming' })
streaming(data: number[]): Observable<number> {
return from(data);
}
@Post('notify')
async sendNotification(): Promise<any> {
return this.client.emit<number>('notification', true);
}
@EventPattern('notification')
eventHandler(data: boolean) {
RMQController.IS_NOTIFIED = data;
}
}