Skip to content
This repository has been archived by the owner on Nov 7, 2020. It is now read-only.

Commit

Permalink
feat(googlePubSub): initial implementation of googlePubSub
Browse files Browse the repository at this point in the history
  • Loading branch information
mentos1386 committed Jul 11, 2018
1 parent 34f71bf commit b82b842
Show file tree
Hide file tree
Showing 13 changed files with 1,356 additions and 173 deletions.
8 changes: 6 additions & 2 deletions Dockerfile
Expand Up @@ -28,7 +28,11 @@ WORKDIR /app/src
# Copy app source
COPY . .

# Delete dependencies possibly got from local dev env
RUN rm -rf node_modules

# Build app
RUN yarn build && mv dist /app/dist
RUN yarn build

CMD [ "node", "/app/dist/api.js" ]
# Default run prodction
CMD [ "yarn", "prod" ]
4 changes: 0 additions & 4 deletions example.env
Expand Up @@ -22,10 +22,6 @@ DB_SYNC=true
# Log
LOG_LEVEL=debug

# Redis
REDIS_HOST=redis
REDIS_PORT=6379

# ESI
ESI_CLIENT=7sdf771c2casc8das8va78vasd
ESI_SECRET=Jb1b3j12jaskjdkjJjhk3jHJhs6Jkj456sdh3
Expand Down
6 changes: 0 additions & 6 deletions nodemon.json

This file was deleted.

10 changes: 6 additions & 4 deletions package.json
Expand Up @@ -7,8 +7,7 @@
"scripts": {
"start:killmails": "nodemon --watch src --ext ts --ignore 'src/**/*.spec.ts' --exec ts-node ./src/killmails.ts",
"start:updater": "nodemon --watch src --ext ts --ignore 'src/**/*.spec.ts' --exec ts-node ./src/updater.ts",
"start": "nodemon",
"seed": "node seed",
"start": "nodemon --watch src --ext ts --ignore 'src/**/*.spec.ts' --exec ts-node ./src/api.ts",
"lint": "tslint -c tslint.json -p tsconfig.json",
"build": "tsc",
"prod": "node dist/api.js",
Expand Down Expand Up @@ -41,6 +40,7 @@
"instrument": true
},
"dependencies": {
"@google-cloud/pubsub": "^0.19.0",
"@nestjs/common": "^5.1.0",
"@nestjs/core": "^5.1.0",
"@nestjs/cqrs": "^5.1.1",
Expand All @@ -50,20 +50,22 @@
"@nestjs/typeorm": "^5.0.2",
"@nestjs/websockets": "^5.1.0",
"@types/bluebird": "^3.5.21",
"@types/google-cloud__pubsub": "^0.18.0",
"@types/hashids": "^1.0.30",
"@types/ioredis": "^3.2.11",
"@types/node": "^10.5.2",
"@types/winston": "^2.3.9",
"@types/ws": "^5.1.2",
"axios": "^0.18.0",
"bluebird": "^3.5.1",
"body-parser": "^1.18.3",
"class-sanitizer": "^0.0.5",
"class-transformer": "^0.1.9",
"class-validator": "^0.8.5",
"dotenv": "^6.0.0",
"fastify": "^1.8.0",
"fastify-formbody": "^2.0.0",
"fastify-swagger": "^0.12.0",
"hashids": "^1.1.4",
"ioredis": "^3.2.2",
"logform": "^1.9.1",
"moment": "^2.22.2",
"pg": "^7.4.3",
Expand Down
9 changes: 2 additions & 7 deletions src/api.ts
@@ -1,6 +1,4 @@
import * as bodyParser from 'body-parser';
import * as express from 'express';
import { NestFactory } from '@nestjs/core';
import { NestFactory, FastifyAdapter } from '@nestjs/core';
import { ApiModule } from './modules/api.module';
import { ValidatorPipe } from './modules/core/validation/validator.pipe';
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger';
Expand All @@ -17,10 +15,7 @@ import 'zone.js/dist/long-stack-trace-zone.js';
async function bootstrap() {
config();

const instance = express();
instance.use(bodyParser.json());

const nestApp = await NestFactory.create(ApiModule, instance);
const nestApp = await NestFactory.create(ApiModule, new FastifyAdapter());
nestApp.enableCors();
nestApp.useGlobalPipes(new ValidatorPipe());
nestApp.useGlobalInterceptors(
Expand Down
3 changes: 3 additions & 0 deletions src/modules/api.module.ts
Expand Up @@ -15,6 +15,7 @@ import { TypeOrmModule } from '@nestjs/typeorm';
import { NotificationModule } from './notification/notification.module';
import { AuthMiddleware } from './authentication/authentication.middleware';
import { WebsocketModule } from './websocket/websocket.module';
import { GooglePubSubModule } from './core/googlePubSub/googlePubSub.module';

@Module({
imports: [
Expand All @@ -34,6 +35,8 @@ import { WebsocketModule } from './websocket/websocket.module';
synchronize: process.env.DB_SYNC === 'true',
}),

GooglePubSubModule.forRoot(),

AuthenticationModule,
SearchModule,
AllianceModule,
Expand Down
10 changes: 10 additions & 0 deletions src/modules/core/googlePubSub/googlePubSub.constants.ts
@@ -0,0 +1,10 @@
export const GOOGLE_PUB_SUB_PROVIDER = 'GOOGLE_PUB_SUB_PROVIDER';
export const GOOGLE_PUB_SUB_CONFIG = 'GOOGLE_PUB_SUB_CONFIG';

export const GOOGLE_PUB_SUB_TOPIC = 'GOOGLE_PUB_SUB_TOPIC';
export const GOOGLE_PUB_SUB_SUBSCRIPTION = 'GOOGLE_PUB_SUB_SUBSCRIPTION';

export const GOOGLE_PUB_SUB_PUBLISHER_OPTIONS = 'GOOGLE_PUB_SUB_PUBLISHER_OPTIONS';
export const GOOGLE_PUB_SUB_SUBSCRIBER_OPTIONS = 'GOOGLE_PUB_SUB_SUBSCRIBER_OPTIONS';

export const GOOGLE_PUB_SUB_CONFIG_USE_ENV = Symbol('GOOGLE_PUB_SUB_CONFIG_USE_ENV');
23 changes: 23 additions & 0 deletions src/modules/core/googlePubSub/googlePubSub.interface.ts
@@ -0,0 +1,23 @@
export interface GooglePubSubMessage {
id: string;
ackId: string;
data: string;
attributes: any;
timestamp: Date;

ack();
nack();

// message.id = ID of the message.
// message.ackId = ID used to acknowledge the message receival.
// message.data = Contents of the message.
// message.attributes = Attributes of the message.
// message.timestamp = Timestamp when Pub/Sub received the message.

// Ack the message:
// message.ack();

// This doesn't ack the message, but allows more messages to be retrieved
// if your limit was hit or if you don't want to ack the message.
// message.nack();
}
92 changes: 92 additions & 0 deletions src/modules/core/googlePubSub/googlePubSub.module.ts
@@ -0,0 +1,92 @@
import { Global, Module, DynamicModule } from '@nestjs/common';
import { googlePubSubProviders } from './googlePubSub.providers';
import * as PubSub from '@google-cloud/pubsub';
import {
GOOGLE_PUB_SUB_CONFIG,
GOOGLE_PUB_SUB_TOPIC,
GOOGLE_PUB_SUB_PUBLISHER_OPTIONS,
GOOGLE_PUB_SUB_SUBSCRIBER_OPTIONS,
GOOGLE_PUB_SUB_SUBSCRIPTION,
GOOGLE_PUB_SUB_CONFIG_USE_ENV,
} from './googlePubSub.constants';
import { GooglePubSub } from './googlePubSub';

@Global()
@Module({
providers: [
...googlePubSubProviders,
{
provide: GOOGLE_PUB_SUB_CONFIG,
useValue: GOOGLE_PUB_SUB_CONFIG_USE_ENV,
},
],
exports: [
...googlePubSubProviders,
{
provide: GOOGLE_PUB_SUB_CONFIG,
useValue: GOOGLE_PUB_SUB_CONFIG_USE_ENV,
},
],
})
export class GooglePubSubModule {
static forRoot(
config?: PubSub.GCloudConfiguration,
): DynamicModule {
return {
module: GooglePubSubModule,
providers: [
{
provide: GOOGLE_PUB_SUB_CONFIG,
useValue: config,
},
],
exports: [
{
provide: GOOGLE_PUB_SUB_CONFIG,
useValue: config,
},
],
};
}

static forFeature(
topic: string,
subscription: string,
publisherSubscriberConfig: {
publisherOptions?: PubSub.Topic.PublisherOptions,
subscriptionOptions?: PubSub.Topic.SubscriptionOptions,
} = {},
): DynamicModule {
if (!publisherSubscriberConfig.publisherOptions) {
publisherSubscriberConfig.publisherOptions = {};
}
if (!publisherSubscriberConfig.subscriptionOptions) {
publisherSubscriberConfig.subscriptionOptions = {};
}
return {
module: GooglePubSubModule,
providers: [
{
provide: GOOGLE_PUB_SUB_TOPIC,
useValue: topic,
},
{
provide: GOOGLE_PUB_SUB_SUBSCRIPTION,
useValue: subscription,
},
{
provide: GOOGLE_PUB_SUB_PUBLISHER_OPTIONS,
useValue: publisherSubscriberConfig.publisherOptions,
},
{
provide: GOOGLE_PUB_SUB_SUBSCRIBER_OPTIONS,
useValue: publisherSubscriberConfig.subscriptionOptions,
},
GooglePubSub,
],
exports: [
GooglePubSub,
],
};
}
}
21 changes: 21 additions & 0 deletions src/modules/core/googlePubSub/googlePubSub.providers.ts
@@ -0,0 +1,21 @@
import {
GOOGLE_PUB_SUB_PROVIDER,
GOOGLE_PUB_SUB_CONFIG,
GOOGLE_PUB_SUB_CONFIG_USE_ENV,
} from './googlePubSub.constants';
import * as PubSub from '@google-cloud/pubsub';

export const googlePubSubProviders = [
{
provide: GOOGLE_PUB_SUB_PROVIDER,
useFactory: (
googlePubSubConfig?: PubSub.GCloudConfiguration,
) : PubSub.PubSub => {
if (googlePubSubConfig === GOOGLE_PUB_SUB_CONFIG_USE_ENV) {
return PubSub();
}
return PubSub(googlePubSubConfig);
},
inject: [GOOGLE_PUB_SUB_CONFIG],
},
];
83 changes: 83 additions & 0 deletions src/modules/core/googlePubSub/googlePubSub.ts
@@ -0,0 +1,83 @@
import { Injectable, Inject, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import {
GOOGLE_PUB_SUB_PROVIDER,
GOOGLE_PUB_SUB_TOPIC,
GOOGLE_PUB_SUB_PUBLISHER_OPTIONS,
GOOGLE_PUB_SUB_SUBSCRIBER_OPTIONS,
GOOGLE_PUB_SUB_SUBSCRIPTION,
} from './googlePubSub.constants';
import { IEventPublisher } from '@nestjs/cqrs/dist/interfaces/events/event-publisher.interface';
import { IMessageSource } from '@nestjs/cqrs/dist/interfaces/events/message-source.interface';
import { IEvent } from '@nestjs/cqrs/dist/interfaces/events/event.interface';
import { Subject } from 'rxjs/Subject';
import * as PubSub from '@google-cloud/pubsub';
import { fromEvent } from 'rxjs';
import { GooglePubSubMessage } from './googlePubSub.interface';

@Injectable()
export class GooglePubSub implements IEventPublisher, IMessageSource, OnModuleInit, OnModuleDestroy
{

private events: PubSub.Subscription;

constructor(
@Inject(GOOGLE_PUB_SUB_PROVIDER)
private googlePubSub: PubSub.PubSub,
@Inject(GOOGLE_PUB_SUB_TOPIC)
private topic: string,
@Inject(GOOGLE_PUB_SUB_SUBSCRIPTION)
private subscription: string,
@Inject(GOOGLE_PUB_SUB_PUBLISHER_OPTIONS)
private publisherOptions: PubSub.Topic.PublisherOptions,
@Inject(GOOGLE_PUB_SUB_SUBSCRIBER_OPTIONS)
private subscriberOptions: PubSub.Topic.SubscriptionOptions,
) {
}

async onModuleInit() {
console.log('GooglePubSub Module initializing!');
console.log(`Setting up topic: ${this.topic}`);
await this.googlePubSub.createTopic(this.topic);
console.log(`Setting up subscription: ${this.subscription}`);
await this.googlePubSub.createSubscription(this.topic, this.subscription);
}

onModuleDestroy() {
console.log('GooglePubSub Module destroying!');
this.events.removeAllListeners();
}

// Publisher
publish<T extends IEvent>(event: T) {
const message = Buffer.from(JSON.stringify(event));

this.googlePubSub
.topic(this.topic)
.publisher(this.publisherOptions)
.publish(message)
.then((messageId) => {
console.log(`Published message: ${messageId}`);
})
.catch((err) => {
console.error('Error publishing message!', err);
});
}

bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
// Subscriber
this.events = this.googlePubSub
.topic(this.topic)
.subscription(this.subscription, this.subscriberOptions);

fromEvent<GooglePubSubMessage>(this.events, 'message')
.subscribe((message) => {
console.log(`Recived message: ${message.id}`);
const event = JSON.parse(message.data);
subject.next(event);
message.ack();
});

fromEvent(this.events, 'error')
.subscribe(error => console.error('Recived error message', error));
}
}
9 changes: 9 additions & 0 deletions src/modules/post/post.module.ts
Expand Up @@ -14,12 +14,16 @@ import { commandHandlers } from './commands/handlers';
import { eventHandlers } from './events/handlers';
import { ModuleRef } from '@nestjs/core';
import { NotificationModule } from '../notification/notification.module';
import { GooglePubSubModule } from '../core/googlePubSub/googlePubSub.module';
import { GooglePubSub } from '../core/googlePubSub/googlePubSub';

@Module({
imports: [
CQRSModule,
TypeOrmModule.forFeature([PostRepository]),

GooglePubSubModule.forFeature('test-topic', 'test-subscription'),

AuthenticationModule,
CharacterModule,
CorporationModule,
Expand All @@ -45,6 +49,7 @@ export class PostModule implements OnModuleInit {
private readonly moduleRef: ModuleRef,
private readonly command$: CommandBus,
private readonly event$: EventBus,
private readonly googlePubSub: GooglePubSub,
) {
// FIXME: Nasty hack, for some reason onModuleInit isn't executed
this.onModuleInit();
Expand All @@ -54,6 +59,10 @@ export class PostModule implements OnModuleInit {
this.command$.setModuleRef(this.moduleRef);
this.event$.setModuleRef(this.moduleRef);

// subject$ is protected and doesn't work :(
// this.googlePubSub.bridgeEventsTo(this.event$.subject$);
this.event$.publisher = this.googlePubSub;

this.event$.register(eventHandlers);
this.command$.register(commandHandlers);
}
Expand Down

0 comments on commit b82b842

Please sign in to comment.