Skip to content

Commit

Permalink
Merge 2d52bb0 into aa086d6
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-casarrubias committed May 12, 2018
2 parents aa086d6 + 2d52bb0 commit 59805a6
Show file tree
Hide file tree
Showing 10 changed files with 518 additions and 90 deletions.
17 changes: 9 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
{
"name": "@onixjs/core",
"version": "1.0.0-alpha.28",
"version": "1.0.0-alpha.29",
"description": "An Enterprise Grade NodeJS Platform that implements Industry Standards and Patterns in order to provide Connectivity, Stability, High-Availability and High-Performance.",
"main": "dist/src/index.js",
"scripts": {
"build": "npm run lint:fix && npm run build:dist && npm run build:dist6 && npm run build:docs",
"build": "npm run build:dist && npm run build:dist6",
"build:dist": "tsc --target es2017 --outDir dist",
"build:dist6": "tsc --target es2015 --outDir dist6",
"build:watch": "tsc --watch",
"build:docs": "typedoc --out ./documentation ./src",
"clean": "rm -rf dist dist6",
"lint": "npm run prettier:check && npm run tslint",
"lint:fix": "npm run prettier:fix && npm run tslint:fix",
"lint": "npm run prettier:check && npm run lint:fix && npm run tslint",
"lint:fix": "npm run prettier:fix",
"prettier:cli": "prettier \"**/*.ts\" \"**/*.js\"",
"prettier:check": "npm run prettier:cli -- -l",
"prettier:fix": "npm run prettier:cli -- --write",
"tslint": "tslint -c tslint.full.json --project tsconfig.json --type-check",
"tslint:fix": "npm run lint -- --fix",
"prepublish": "npm run build",
"pretest": "npm run lint:fix && npm run clean && npm run build",
"prepublish": "npm run lint:fix && npm run clean && npm run build",
"pretest": "npm run build",
"test": "nyc ava && nyc report --reporter=text-lcov | coveralls",
"posttest": "npm run lint",
"serve:docs": "npm run build && node ./dist/documentation"
Expand Down Expand Up @@ -49,7 +49,7 @@
},
"dependencies": {
"@onixjs/enumerable": "1.0.0-alpha.7",
"@onixjs/sdk": "^1.0.0-alpha.13.1",
"@onixjs/sdk": "^1.0.0-alpha.14",
"finalhandler": "^1.1.1",
"reflect-metadata": "^0.1.12",
"router": "^1.3.2"
Expand All @@ -73,7 +73,6 @@
"typedoc": "^0.10.0",
"typeorm": "^0.1.12",
"typescript": "^2.6.2",
"uws": "^9.148.0",
"ws": "^5.1.1"
},
"ava": {
Expand Down Expand Up @@ -104,6 +103,8 @@
"exclude": [
"dist/src/core/app.server.js",
"dist6/src/core/app.server.js",
"dist/src/core/call.connect.js",
"dist6/src/core/call.connect.js",
"coverage/**",
"dist/test/**",
"dist6/test/**",
Expand Down
220 changes: 220 additions & 0 deletions src/core/call.connect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import {
IAppOperation,
OperationType,
IMetaData,
ICallConfig,
IOnixSchema,
} from '../interfaces';
import {Utils} from '@onixjs/sdk/dist/utils';
import {NodeJS} from '@onixjs/sdk/dist/adapters/node.adapters';
/**
* @class CallConnect
* @author Jonathan Casarrubias
* @license MIT
* @description This class will provide connectivity
* between soa services.
*
* It will provide connection throgh IO Streams to other
* services living in the same cluster.
*/
export class CallConnect {
/**
* @property client
* @description This property creates an HTTP client
* instance to call for the broker schema.
*/
private client: NodeJS.HTTP = new NodeJS.HTTP();
/**
* @property schema
* @description This property contains a reference
* of the onix schema
*/
private schema: IOnixSchema;
/**
* @constructor
* @param config
* @description Receives the ICallConfig in order
* to validate calls, streams and schema.
* Only valid calls and streams will be executed,
* of course avoiding sending falsy information to
* the broker.
*/
constructor(public config: ICallConfig) {}
/**
* @method getSchema
* @description This method assigns a singleton schema
* that will be pulled once from the current onix broker.
*/
private async getSchema() {
// Singleton schema
if (this.schema) {
return;
}
// If the user is sending a token then verify with the sso.
// TODO Add Result Interace from @onixjs/sdk
this.schema = <IOnixSchema>await this.client.get(
`http://${
this.config.broker.host ? this.config.broker.host : '127.0.0.1'
}:${this.config.broker.port}/.well-known/onixjs-schema`,
);
}
/**
* @method call
* @param payload
* @param metadata
* @description This method will coordinate with the OnixJS Broker to
* execute a RPC, that can be either on this SOA Service or within
* any other SOA Service living in the same cluster.
*/
async call<T>(payload: T, metadata: IMetaData = {stream: false}): Promise<T> {
// Hard copy the configuration
const config: ICallConfig = JSON.parse(JSON.stringify(this.config));
// Make sure everything is well configured in order to make this call
await this.validate('rpc', config);
// Ok cool, lets make the call to the broker
return new Promise<T>((resolve, reject) => {
// Set metadata for this call not being a stream
metadata.stream = false;
// Create App Operation
const operation: IAppOperation = {
uuid: Utils.uuid(),
type: OperationType.ONIX_REMOTE_CALL_PROCEDURE,
message: {
rpc: `${config.app}.${config.module}.${config.component}.${
config.method
}`,
request: {
metadata,
payload,
},
},
};
// Listen for broker response
process.on('message', (response: IAppOperation) => {
if (
(response.uuid === operation.uuid,
response.type === OperationType.ONIX_REMOTE_CALL_PROCEDURE_RESPONSE)
) {
if (
response.message.request.payload.code &&
response.message.request.payload.message
) {
reject(response.message.request.payload);
} else {
resolve(response.message.request.payload);
}
}
});
// Send result back to broker
if (process.send) process.send(operation);
});
}
/**
* @method stream
* @param handler
* @param metadata
* @description This method will register a stream connection
* to other service living in the same cluster of services.
*/
async stream<T>(
handler: (payload: T, metadata: IMetaData) => void,
metadata: IMetaData = {stream: true},
) {
// Hard copy the configuration
const config: ICallConfig = JSON.parse(JSON.stringify(this.config));
// Make sure everything is well configured in order to make this call
await this.validate('rpc', config);
// complete metadata in case is not existent
if (!metadata.stream) metadata.stream = true;
// Create App Operation
const operation: IAppOperation = {
uuid: Utils.uuid(),
type: OperationType.ONIX_REMOTE_CALL_STREAM,
message: {
rpc: `${config.app}.${config.module}.${config.component}.${
config.method
}`,
request: {
metadata,
payload: undefined,
},
},
};
// Listen for broker response
process.on('message', (response: IAppOperation) => {
if (
(response.uuid === operation.uuid,
response.type === OperationType.ONIX_REMOTE_CALL_PROCEDURE_RESPONSE)
) {
handler(
response.message.request.payload,
response.message.request.metadata,
);
}
});
// Send result back to broker
if (process.send) process.send(operation);
}
/**
* @method validate
* @param type
* @param config
* @description This method will validate if the current configuration
* is correct verifying with the OnixJS Schema.
*/
private async validate(type: string, config: ICallConfig) {
// Wait for schema to available
try {
await this.getSchema();
} catch (e) {
throw e;
}
// Verify the config makes sense
if (!this.schema[this.config.app]) {
throw new Error(
`ONIXJS: The app "${
this.config.app
}" is not hosted by the provided broker.`,
);
}
if (!this.schema[this.config.app].modules[this.config.module]) {
throw new Error(
`ONIXJS: The module "${this.config.module}" doesn't belongs to app "${
this.config.app
}".`,
);
}
if (
!this.schema[this.config.app].modules[this.config.module][
this.config.component
]
) {
throw new Error(
`ONIXJS: The component "${
this.config.component
}" doesn't belongs to module "${this.config.module}".`,
);
}
if (
!this.schema[this.config.app].modules[this.config.module][
this.config.component
][this.config.method]
) {
throw new Error(
`ONIXJS: The method "${
this.config.method
}" doesn't belongs to component "${this.config.component}".`,
);
}
if (
type !==
this.schema[this.config.app].modules[this.config.module][
this.config.component
][this.config.method]
) {
throw new Error(
`ONIXJS: The method ${this.config.method} is not type of ${type}.`,
);
}
}
}
56 changes: 51 additions & 5 deletions src/core/host.broker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import * as http from 'http';
import * as https from 'https';
import {IAppOperation, IAppDirectory, WebSocketAdapter} from '../interfaces';
import {
IAppOperation,
IAppDirectory,
WebSocketAdapter,
OperationType,
} from '../interfaces';
import {ChildProcess} from 'child_process';
import {Utils} from '@onixjs/sdk/dist/utils';
/**
* @class HostBroker
* @author Jonathan Casarrubias
Expand All @@ -24,16 +31,32 @@ export class HostBroker {
private websocket: WebSocketAdapter,
private apps: IAppDirectory,
) {
// Listen for WebSocket Requests
this.websocket.WebSocket(this.server).on('connection', ws => {
ws.on('message', (data: string) => this.handle(ws, JSON.parse(data)));
ws.on('message', (data: string) => {
this.wsHandler(ws, Utils.IsJsonString(data) ? JSON.parse(data) : data);
});
});
// Listen for IO STD Streams from any app
Object.keys(this.apps).forEach((app: string) => {
this.apps[app].process.on('message', (operation: IAppOperation) => {
// Verify the operation is some sort of communication (RPC or Stream)
if (
operation.type === OperationType.ONIX_REMOTE_CALL_PROCEDURE ||
operation.type === OperationType.ONIX_REMOTE_CALL_STREAM
) {
this.ioHandler(this.apps[app].process, operation);
}
});
});
}
/**
* @method handle
* @method wsHandler
* @param message
* @description This method will handle
* @description This method will handle web socket requests sending
* a valid AppOperation.
*/
handle(ws: WebSocket, operation: IAppOperation) {
wsHandler(ws: WebSocket, operation: IAppOperation) {
// Route Message to the right application
const callee: string = operation.message.rpc.split('.').shift() || '';
if (this.apps[callee]) {
Expand All @@ -50,4 +73,27 @@ export class HostBroker {
throw new Error('Unable to find callee application');
}
}
/**
* @method ioHandler
* @param message
* @description This method will handle web socket requests sending
* a valid AppOperation.
*/
ioHandler(process: ChildProcess, operation: IAppOperation) {
// Route Message to the right application
const callee: string = operation.message.rpc.split('.').shift() || '';
if (this.apps[callee]) {
this.apps[callee].process.on('message', (response: IAppOperation) => {
if (operation.uuid === response.uuid) {
// Send application response to websocket client
process.send(response);
}
});
// Route incoming message to the right application
// through std io stream
this.apps[callee].process.send(operation);
} else {
throw new Error('Unable to find callee application');
}
}
}
1 change: 1 addition & 0 deletions src/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ export * from './host.boot';
export * from './app.server';
export * from './app.factory';
export * from './app.notifier';
export * from './call.connect';
export * from './acl.groups';
export * from './acl.everyone';
10 changes: 7 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class OnixJS {
* @description Current Onix Version.
*/
get version(): string {
return '1.0.0-alpha.28';
return '1.0.0-alpha.29';
}
/**
* @property router
Expand Down Expand Up @@ -103,7 +103,9 @@ export class OnixJS {
message: {
rpc: 'ping',
request: {
metadata: {},
metadata: {
stream: false,
},
payload: {},
},
},
Expand Down Expand Up @@ -140,7 +142,9 @@ export class OnixJS {
message: {
rpc: '[apps].isAlive', // [apps] will be overriden inside each app
request: {
metadata: {},
metadata: {
stream: false,
},
payload: apps,
},
},
Expand Down
Loading

0 comments on commit 59805a6

Please sign in to comment.