Skip to content

Commit

Permalink
feat: add basic support for last-event-id in Events API
Browse files Browse the repository at this point in the history
  • Loading branch information
relu91 committed Jun 21, 2022
1 parent ce18274 commit cabe601
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 19 deletions.
17 changes: 12 additions & 5 deletions src/api-reference/events/events.controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Controller, MessageEvent, Param, Query, Sse } from '@nestjs/common';
import { Controller, Headers, MessageEvent, Param, Query, Sse } from '@nestjs/common';
import { ApiTags } from '@nestjs/swagger';
import { Observable } from 'rxjs';

Expand All @@ -11,12 +11,19 @@ export class EventsController {
public constructor(private readonly eventsService: EventsService) {}

@Sse()
public subscribeToAll(@Query() diff: boolean): Observable<MessageEvent> {
return this.eventsService.subscribeToAll(diff);
public async subscribeToAll(
@Query() diff: boolean,
@Headers('last-event-id') lastEvent?: string,
): Promise<Observable<MessageEvent>> {
return this.eventsService.subscribeToAll(diff, lastEvent);
}

@Sse(':type')
public subscribeTo(@Param('type') type: EventType, @Query() diff: boolean): Observable<MessageEvent> {
return this.eventsService.subscribeTo(type, diff);
public async subscribeTo(
@Param('type') type: EventType,
@Query() diff: boolean,
@Headers('last-event-id') lastEvent?: string,
): Promise<Observable<MessageEvent>> {
return this.eventsService.subscribeTo(type, diff, lastEvent);
}
}
2 changes: 2 additions & 0 deletions src/api-reference/events/events.module.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Module } from '@nestjs/common';

import { PersistenceModule } from '../../persistence/persistence.module';
import { EventsController } from './events.controller';
import { EventsService } from './events.service';

@Module({
imports: [PersistenceModule],
controllers: [EventsController],
providers: [EventsService],
exports: [EventsService],
Expand Down
55 changes: 43 additions & 12 deletions src/api-reference/events/events.service.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Injectable, MessageEvent } from '@nestjs/common';
import { Observable, Subject, filter, map } from 'rxjs';
import { Observable, Subject, concat, filter, map } from 'rxjs';

import { EventType, TDLifeCycleEvent } from '../../common/models/events';
import { ThingDescription } from '../../common/models/thing-description';
import { TDLifeCycleEventRepository } from '../../persistence/events.repository';

function mapEventsWithDiff(diff: boolean) {
return map((event: TDLifeCycleEvent): MessageEvent => {
Expand All @@ -17,25 +18,55 @@ function mapEventsWithDiff(diff: boolean) {
export class EventsService {
private thingLifeCycle: Subject<TDLifeCycleEvent>;

public constructor() {
public constructor(private readonly eventsRepository: TDLifeCycleEventRepository) {
this.thingLifeCycle = new Subject<TDLifeCycleEvent>();
}
public subscribeToAll(diff: boolean): Observable<MessageEvent> {
const s = this.thingLifeCycle.pipe(mapEventsWithDiff(diff));
return s;
public async subscribeToAll(diff: boolean, lastEvent?: string): Promise<Observable<MessageEvent>> {
if (lastEvent) {
const obs = await this.createObservableForPastEvents(diff, parseInt(lastEvent));
return concat(obs, this.thingLifeCycle.pipe(mapEventsWithDiff(diff)));
}
return this.thingLifeCycle.pipe(mapEventsWithDiff(diff));
}

public subscribeTo(type: EventType, diff: boolean): Observable<MessageEvent> {
public async subscribeTo(type: EventType, diff: boolean, lastEvent?: string): Promise<Observable<MessageEvent>> {
if (lastEvent) {
const obs = await this.createObservableForPastEvents(diff, parseInt(lastEvent), type);
return concat(
obs.pipe(),
this.thingLifeCycle.pipe(filter((event) => event.type === type)).pipe(mapEventsWithDiff(diff)),
);
}
return this.thingLifeCycle.pipe(filter((event) => event.type === type)).pipe(mapEventsWithDiff(diff));
}

public emitCreated(td: ThingDescription) {
this.thingLifeCycle.next({ id: 1, type: EventType.THING_CREATED, td });
public async emitCreated(td: ThingDescription) {
const event = await this.eventsRepository.create({ type: EventType.THING_CREATED, td });
this.thingLifeCycle.next(event);
}
public emitUpdated(td: Partial<ThingDescription>) {
this.thingLifeCycle.next({ id: 1, type: EventType.THING_UPDATED, td });
public async emitUpdated(td: Partial<ThingDescription>) {
const event = await this.eventsRepository.create({ type: EventType.THING_UPDATED, td });
this.thingLifeCycle.next(event);
}
public emitDeleted(id: ThingDescription['id']) {
this.thingLifeCycle.next({ id: 1, type: EventType.THING_DELETED, td: { id } });
public async emitDeleted(id: ThingDescription['id']) {
const event = await this.eventsRepository.create({ type: EventType.THING_DELETED, td: { id } });
this.thingLifeCycle.next(event);
}

private async createObservableForPastEvents(
diff: boolean,
id: number,
type?: EventType,
): Promise<Observable<MessageEvent>> {
const missedEvents = await this.eventsRepository.findAfter(id);
const obs = new Observable<TDLifeCycleEvent>((subscriber) => {
missedEvents.forEach((event) => subscriber.next(event));
});

if (type) {
return obs.pipe(filter((event) => event.type === type)).pipe(mapEventsWithDiff(diff));
}

return obs.pipe(mapEventsWithDiff(diff));
}
}
4 changes: 4 additions & 0 deletions src/config/config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ export class ConfigService {
return this.nestConfigService.get('DB_PORT', 5432);
}

public get maxEvents(): number {
return this.nestConfigService.get('MAX_EVENTS', 100);
}

public get dbUser(): string {
return this.nestConfigService.get('DB_USER', 'postgres');
}
Expand Down
59 changes: 59 additions & 0 deletions src/persistence/events.repository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { TDLifeCycleEvent } from './../common/models/events';
import { ConfigService } from '../config/config.service';
import {
DeleteArgs,
ExistArgs,
FindArgs,
FindFirstArgs,
FindOneArgs,
Repository,
UpdateArgs,
} from './repository.interface';

let ids = 0;
/**
* In memory implementation of an Event Repository.
* TODO: we should store events in a database like Redis.
*/
export class TDLifeCycleEventRepository implements Repository<TDLifeCycleEvent> {
private events: TDLifeCycleEvent[];
private maxEvents: number;

//TODO: check injection of ConfigService
public constructor(config: ConfigService) {
this.events = [];
this.maxEvents = config?.maxEvents || 100;
}

public async create(item: Omit<TDLifeCycleEvent, 'id'>): Promise<TDLifeCycleEvent> {
if (this.events.length >= this.maxEvents) {
this.events.shift();
}
const event = { ...item, id: ids++ };
this.events.push(event);
return event;
}
public update(args: UpdateArgs<TDLifeCycleEvent>): Promise<void> {
throw new Error('Method not implemented.');
}
public delete(args: DeleteArgs<TDLifeCycleEvent>): Promise<void> {
throw new Error('Method not implemented.');
}
public find(args?: FindArgs<TDLifeCycleEvent> | undefined): Promise<TDLifeCycleEvent[]> {
throw new Error('Method not implemented.');
}
public findOne(args: FindOneArgs<TDLifeCycleEvent>): Promise<TDLifeCycleEvent> {
throw new Error('Method not implemented.');
}
public findFirst(args: FindFirstArgs<TDLifeCycleEvent>): Promise<TDLifeCycleEvent> {
throw new Error('Method not implemented.');
}
public exist(args: ExistArgs<TDLifeCycleEvent>): Promise<boolean> {
throw new Error('Method not implemented.');
}

public async findAfter(id: TDLifeCycleEvent['id']): Promise<TDLifeCycleEvent[]> {
const index = this.events.findIndex((event) => event.id >= id);
return this.events.slice(index + 1);
}
}
6 changes: 4 additions & 2 deletions src/persistence/persistence.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import { InjectKnex, Knex, KnexModule } from 'nestjs-knex';

import { ConfigModule } from '../config/config.module';
import { ConfigService } from '../config/config.service';
import { TDLifeCycleEventRepository } from './events.repository';
import { ThingDescriptionRepository } from './thing-description.repository';
import { UserRepository } from './user.repository';

@Module({
imports: [
ConfigModule,
KnexModule.forRootAsync({
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
Expand All @@ -25,8 +27,8 @@ import { UserRepository } from './user.repository';
inject: [ConfigService],
}),
],
providers: [UserRepository, ThingDescriptionRepository],
exports: [UserRepository, ThingDescriptionRepository],
providers: [UserRepository, ThingDescriptionRepository, TDLifeCycleEventRepository],
exports: [UserRepository, ThingDescriptionRepository, TDLifeCycleEventRepository],
})
export class PersistenceModule implements OnApplicationShutdown {
public constructor(@InjectKnex() private readonly knex: Knex) {}
Expand Down
42 changes: 42 additions & 0 deletions test/e2e/events.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,48 @@ describe('/events', () => {
expect(collected[0].data).toBeDefined();
expect(collected[0].id).toBeDefined();
});

it('should support last-event-id', async () => {
const { status, headers, data } = await axios.get('/events/thing_created', { responseType: 'stream' });

expect(status).toBe(200);
expect(headers['content-type']).toContain('text/event-stream');

const collectPromise = collectMessages(data, 3);

for (let i = 0; i < 3; i++) {
const createRequest = await axios.post('/things', validThingDescription, {
headers: { Authorization: `Bearer ${defaultAccessToken}` },
});
expect(createRequest.status).toBe(201);
const modifiedParts = { title: 'New Title' };

await axios.patch(createRequest.headers['location'], modifiedParts, {
headers: { Authorization: `Bearer ${defaultAccessToken}`, 'Content-Type': 'application/merge-patch+json' },
});
}

const messages = await collectPromise;
data.destroy();

expect(messages.length).toBeGreaterThan(0);
expect(messages[0].id).toBeDefined();

// reconnect with last-event-id
const reconnectResponse = await axios.get('/events/thing_created', {
responseType: 'stream',
headers: { 'Last-Event-ID': `${messages[0].id}` },
});

expect(reconnectResponse.status).toBe(200);
expect(reconnectResponse.headers['content-type']).toContain('text/event-stream');

const nextMessages = await collectMessages(reconnectResponse.data, 2);
reconnectResponse.data.destroy();

expect(nextMessages.length).toBeGreaterThan(0);
expect(nextMessages).toEqual(messages.slice(1));
});
});
});

Expand Down

0 comments on commit cabe601

Please sign in to comment.