Skip to content

Commit

Permalink
🌟 #640 Create the pubsub module (#641)
Browse files Browse the repository at this point in the history
* #640 Be able to start backend services with no dependencies

* #640 Bootstrap the pubsub module
  • Loading branch information
chamerling committed Dec 8, 2020
1 parent 5781170 commit 49799fb
Show file tree
Hide file tree
Showing 12 changed files with 444 additions and 10 deletions.
6 changes: 6 additions & 0 deletions twake/backend/node/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"webserver",
"websocket",
"database",
"pubsub",
"realtime",
"phpnode"
],
Expand Down Expand Up @@ -56,5 +57,10 @@
"retries": 10,
"delay": 200
}
},
"pubsub": {
"urls": [
"amqp://guest:guest@localhost:5672"
]
}
}
138 changes: 138 additions & 0 deletions twake/backend/node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion twake/backend/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"@babel/plugin-proposal-decorators": "^7.10.5",
"@babel/preset-env": "^7.11.5",
"@babel/preset-typescript": "^7.10.4",
"@types/amqp-connection-manager": "^2.0.10",
"@types/chai": "^4.2.12",
"@types/config": "0.0.36",
"@types/eslint": "^7.2.3",
Expand Down Expand Up @@ -74,6 +75,8 @@
"typescript": "^4.0.3"
},
"dependencies": {
"amqp-connection-manager": "^3.2.1",
"amqplib": "^0.6.0",
"cassandra-driver": "^4.6.0",
"class-transformer": "^0.3.1",
"config": "^3.3.2",
Expand All @@ -94,4 +97,4 @@
"socketio-jwt": "^4.6.2",
"uuid": "^8.3.1"
}
}
}
24 changes: 16 additions & 8 deletions twake/backend/node/src/core/platform/framework/api/component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,30 @@ export class TwakeComponent {
component => component.instance.state,
);

this.components.forEach(component => {
state === TwakeServiceState.Initialized && component.instance.init();
state === TwakeServiceState.Started && component.instance.start();
state === TwakeServiceState.Stopped && component.instance.stop();
});
if (!states.length) {
logger.info(`${this.name} does not have children`);
_switchServiceToState(this.instance);

return;
}

this.components.forEach(component => _switchServiceToState(component.instance));

const subscription = combineLatest(states)
.pipe(filter((value: Array<TwakeServiceState>) => value.every(v => v === state)))
.subscribe(() => {
logger.info(`Children of ${this.name} are all in ${state} state`);
logger.info(this.getStateTree());
state === TwakeServiceState.Initialized && this.instance.init();
state === TwakeServiceState.Started && this.instance.start();
state === TwakeServiceState.Stopped && this.instance.stop();

_switchServiceToState(this.instance);
subscription.unsubscribe();
});

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function _switchServiceToState(service: TwakeService<any>) {
state === TwakeServiceState.Initialized && service.init();
state === TwakeServiceState.Started && service.start();
state === TwakeServiceState.Stopped && service.stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export abstract class TwakeService<T extends TwakeServiceProvider>
this.state.next(TwakeServiceState.Initializing);
await this.doInit();
this.state.next(TwakeServiceState.Initialized);
logger.info("Service %s is initialized", this.name);

return this;
} catch (err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export class Loader {

// eslint-disable-next-line @typescript-eslint/no-explicit-any
async load(componentName: string): Promise<any> {
logger.info(`Loading ${componentName}`);
const modulesPaths = this.paths.map(path => `${path}/${componentName}`);

logger.debug(`Loading ${componentName} from ${modulesPaths.join(" - ")}`);
Expand All @@ -14,7 +15,7 @@ export class Loader {
try {
return await import(modulePath);
} catch (err) {
logger.debug(`${modulePath} not found`);
logger.debug({ err }, `${modulePath} can not be loaded`);
}
}),
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { logger } from "../../../framework/logger";
import { constants as CONSTANTS } from "./constants";
import { ConfirmChannel, Replies, Message, Options, ConsumeMessage } from "amqplib";

export type AmqpCallbackType = (
err: Error,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
parsedMessage: any,
originalMessage: ConsumeMessage,
) => void;

// see http://www.squaremobius.net/amqp.node/ for the amqp documentation
export class AmqpClient {
protected _subscribeCallbackToConsumerTags: Map<AmqpCallbackType, string[]>;

constructor(protected channel: ConfirmChannel) {
this._subscribeCallbackToConsumerTags = new Map();
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
dispose(): Promise<void> {
logger.info("AMQP: closing the connection");

return this.channel.close();
}

assertExchange(
exchange: string,
type = CONSTANTS.EXCHANGE_TYPES.topic,
): Promise<Replies.AssertExchange> {
return this.channel.assertExchange(exchange, type);
}

ack(message: Message, allUpTo = false): void {
this.channel.ack(message, allUpTo);
}

assertQueue(name: string, options: Options.AssertQueue): Promise<Replies.AssertQueue> {
return this.channel.assertQueue(name, options);
}

assertBinding(queue: string, exchange: string, routingPattern?: string): Promise<Replies.Empty> {
return this.channel.bindQueue(queue, exchange, routingPattern);
}

send(exchange: string, data: unknown, routingKey = ""): boolean {
return this.channel.publish(exchange, routingKey, dataAsBuffer(data));
}

consume(queue: string, options: Options.Consume, callback: AmqpCallbackType): Promise<void> {
return this.channel
.consume(queue, onMessage, options)
.then(res => this._registerNewConsumerTag(callback, res.consumerTag));

function onMessage(originalMessage: ConsumeMessage) {
try {
const message = JSON.parse(originalMessage.content.toString());
callback(null, message, originalMessage);
} catch (err) {
logger.warn({ err }, "Can not parse the incoming message");
callback(err, null, originalMessage);
}
}
}

_registerNewConsumerTag(callback: AmqpCallbackType, consumerTag: string): void {
const sameCallbackTags = this._subscribeCallbackToConsumerTags.get(callback) || [];

sameCallbackTags.push(consumerTag);
this._subscribeCallbackToConsumerTags.set(callback, sameCallbackTags);

logger.info(`AMQP: A new consumer has been created: ${consumerTag}`);
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function dataAsBuffer(data: any): Buffer {
return Buffer.from(JSON.stringify(data));
}
Loading

0 comments on commit 49799fb

Please sign in to comment.