Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
2 contributors

Users who have contributed to this file

@kamilmysliwiec @jbpionnier
65 lines (54 sloc) 1.8 KB
import { Controller, Get, Inject, OnModuleInit, Param } from '@nestjs/common';
import {
ClientGrpc,
GrpcMethod,
GrpcStreamMethod,
} from '@nestjs/microservices';
import { Observable, ReplaySubject, Subject } from 'rxjs';
import { toArray } from 'rxjs/operators';
import { HeroById } from './interfaces/hero-by-id.interface';
import { Hero } from './interfaces/hero.interface';
interface HeroService {
findOne(data: HeroById): Observable<Hero>;
findMany(upstream: Observable<HeroById>): Observable<Hero>;
}
@Controller('hero')
export class HeroController implements OnModuleInit {
private readonly items: Hero[] = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
private heroService: HeroService;
constructor(@Inject('HERO_PACKAGE') private readonly client: ClientGrpc) {}
onModuleInit() {
this.heroService = this.client.getService<HeroService>('HeroService');
}
@Get()
getMany(): Observable<Hero[]> {
const ids$ = new ReplaySubject<HeroById>();
ids$.next({ id: 1 });
ids$.next({ id: 2 });
ids$.complete();
const stream = this.heroService.findMany(ids$.asObservable());
return stream.pipe(toArray());
}
@Get(':id')
getById(@Param('id') id: string): Observable<Hero> {
return this.heroService.findOne({ id: +id });
}
@GrpcMethod('HeroService')
findOne(data: HeroById): Hero {
return this.items.find(({ id }) => id === data.id);
}
@GrpcStreamMethod('HeroService')
findMany(data$: Observable<HeroById>): Observable<Hero> {
const hero$ = new Subject<Hero>();
const onNext = (heroById: HeroById) => {
const item = this.items.find(({ id }) => id === heroById.id);
hero$.next(item);
};
const onComplete = () => hero$.complete();
data$.subscribe(onNext, null, onComplete);
return hero$.asObservable();
}
}
You can’t perform that action at this time.