Skip to content

Commit

Permalink
Fixed #48
Browse files Browse the repository at this point in the history
  • Loading branch information
claustres committed Mar 12, 2020
1 parent acdf088 commit 5220bc9
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 188 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"compile": "shx rm -rf lib/ && babel -d lib/ src/ && cpx \"lib/**\" example/gateway/lib && cpx \"lib/**\" example/service/lib",
"watch": "babel --watch -d lib/ src/",
"lint": "standard src/**/*.js test/**/*.js --fix",
"mocha": "cross-env NODE_CONFIG_DIR=./test/config/ mocha --exit --require babel-core/register",
"mocha": "cross-env NODE_CONFIG_DIR=./test/config/ mocha --inspect --require babel-core/register",
"coverage": "cross-env NODE_CONFIG_DIR=./test/config/ istanbul cover node_modules/mocha/bin/_mocha -- --exit --require babel-core/register",
"test": "npm run compile && npm run lint && npm run coverage",
"start:gateway": "cd example/gateway && npm install && npm start",
Expand Down Expand Up @@ -85,6 +85,7 @@
"socket.io-client": "^2.0.3",
"standard": "^14.0.0",
"superagent": "^5.1.0",
"uuid": "^3.1.0"
"uuid": "^3.1.0",
"why-is-node-running": "^2.1.0"
}
}
222 changes: 180 additions & 42 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ import makeCote from 'cote'
import uuid from 'uuid/v4'
import makeDebug from 'debug'
import portfinder from 'portfinder'
import { LocalService, RemoteService } from './service'
import RemoteService from './service'

const debug = makeDebug('feathers-distributed')
// Get the unique global symbol to store event listeners on a service object
const EVENT_LISTENER_KEY = Symbol.for('event-listener')

const isInternalService = (app, serviceDescriptor) => {
// Default is to expose all services
Expand All @@ -20,69 +22,140 @@ const isDiscoveredService = (app, serviceDescriptor) => {
else return app.distributionOptions.remoteServices.includes(serviceDescriptor.path)
}

function publishApplication (app) {
app.servicePublisher.publish('application', { uuid: app.uuid })
debug('Published local app with uuid ' + app.uuid)
}

function publishService (app, path) {
const service = app.service(path)
if (!service || (typeof service !== 'object')) return
if (service.remote) {
debug('Ignoring remote service publication on path ' + path + ' for app with uuid ' + app.uuid)
debug('Ignoring remote service publication on path ' + path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)
return
}
const serviceDescriptor = {
uuid: app.uuid,
key: app.distributionKey,
path: stripSlashes(path),
events: service.distributedEvents || service._serviceEvents
}
// Skip internal services
if (isInternalService(app, serviceDescriptor)) {
debug('Ignoring local service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
debug('Ignoring local service on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)
return
}
// Register the responder to handle remote calls to the service
if (!service.responder) service.responder = new LocalService(Object.assign({ app }, serviceDescriptor))
// Setup event listeners whenever required and if not already done
if (app.distributionOptions.publishEvents && serviceDescriptor.events.length && !service[EVENT_LISTENER_KEY]) {
serviceDescriptor.events.forEach(event => {
// Publish events whenever required
service.on(event, object => {
debug(`Publishing ${event} local service event on path ` + serviceDescriptor.path +
' for app with uuid ' + app.uuid + ' and key ' + app.distributionKey, object)
app.serviceEventsPublisher.publish(event, Object.assign({ path: serviceDescriptor.path }, object))
})
// Tag service so that we will not install listeners twice
service[EVENT_LISTENER_KEY] = true
})
debug('Publish callbacks registered for local service events on path ' + serviceDescriptor.path +
' for app with uuid ' + app.uuid + ' and key ' + app.distributionKey, serviceDescriptor.events)
}
// Publish new local service
app.servicePublisher.publish('service', serviceDescriptor)
debug('Published local service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
debug('Published local service on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)
}

function publishServices (app) {
// Add a timeout so that the publisher/subscriber has been initialized on the node
if (app.applicationPublicationTimeout) return
app.applicationPublicationTimeout = setTimeout(_ => {
Object.getOwnPropertyNames(app.services).forEach(path => {
publishService(app, path)
})
// Reset timeout so that next queued publication will be scheduled
app.applicationPublicationTimeout = null
}, app.distributionOptions.publicationDelay)
debug('Publishing local services for app with uuid ' + app.uuid + ' and key ' + app.distributionKey)
}

function registerApplication (app, applicationDescriptor) {
// Create the request/events manager for remote services only
if (applicationDescriptor.uuid === app.uuid) {
debug('Ignoring service requester/events publisher creation for local app with uuid ' + app.uuid)
return
}

const key = applicationDescriptor.key
// Already registered
if (app.serviceRequesters[key]) {
debug('Ignoring already registered remote app with uuid ' + app.uuid + ' and key ' + key)
return
}
debug('Registering remote app with uuid ' + app.uuid + ' and key ' + key)
// Create the request manager to remote services
app.serviceRequesters[key] = new app.cote.Requester({
name: 'feathers services requester',
namespace: key,
key,
requests: ['find', 'get', 'create', 'update', 'patch', 'remove']
}, app.coteOptions)
debug('Service requester ready for remote app with uuid ' + applicationDescriptor.uuid + ' and key ' + key)
// Subscriber to listen to events from other nodes
const events = app.distributionOptions.distributedEvents
app.serviceEventsSubscribers[key] = new app.cote.Subscriber({
name: 'feathers services events subscriber',
namespace: key,
key,
subscribesTo: events
}, app.coteOptions)
events.forEach(event => {
app.serviceEventsSubscribers[key].on(event, object => {
debug(`Dispatching ${event} remote service event on path ` + object.path, object)
const service = app.service(object.path)
service.emit(event, object)
})
})
debug('Service events subscriber ready for remote app with uuid ' + applicationDescriptor.uuid + ' and key ' + key)
}

function registerService (app, serviceDescriptor) {
// Do not register our own services
if (serviceDescriptor.uuid === app.uuid) {
debug('Ignoring local service registration on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
debug('Ignoring local service registration on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)
return
}
// Skip already registered services
const service = app.service(serviceDescriptor.path)
if (service) {
if (service instanceof RemoteService) {
debug('Already registered service as remote on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
debug('Already registered service as remote on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)
} else {
debug('Already registered local service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
debug('Already registered local service on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)
}
return
}
// Skip services we are not interested into
if (!isDiscoveredService(app, serviceDescriptor)) {
debug('Ignoring remote service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
debug('Ignoring remote service on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)
return
}
// Initialize our service by providing any required middleware
let args = [serviceDescriptor.path]
if (app.distributionOptions.middlewares.before) args = args.concat(app.distributionOptions.middlewares.before)
args.push(new RemoteService(serviceDescriptor))
args.push(new RemoteService(app, serviceDescriptor))
if (app.distributionOptions.middlewares.after) args = args.concat(app.distributionOptions.middlewares.after)
app.use(...args)
debug('Registered remote service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
debug('Registered remote service on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)

// registering hook object on every remote service
if (app.distributionOptions.hooks) {
app.service(serviceDescriptor.path).hooks(app.distributionOptions.hooks)
}
debug('Registered hooks on remote service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
debug('Registered hooks on remote service on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)

// Dispatch an event internally through node so that async processes can run
app.emit('service', serviceDescriptor)
Expand All @@ -92,59 +165,125 @@ export function initialize (app) {
debug('Initializing cote with options', app.coteOptions)
// Setup cote with options
app.cote = makeCote(app.coteOptions)
app.distributionKey = app.distributionOptions.key || app.uuid

// This subscriber listen to an event each time a remote app service has been registered
app.serviceSubscriber = new app.cote.Subscriber({
name: 'feathers services subscriber',
namespace: 'services',
key: 'services',
subscribesTo: ['application', 'service']
subscribesTo: ['service']
}, app.coteOptions)
debug('Services subscriber ready for app with uuid ' + app.uuid)
debug('Services subscriber ready for app with uuid ' + app.uuid + ' and key ' + app.distributionKey)
// When a remote service is declared create the local proxy interface to it
app.serviceSubscriber.on('service', serviceDescriptor => {
// When a new app pops up create the required proxy to it first
registerApplication(app, serviceDescriptor)
registerService(app, serviceDescriptor)
})

// This publisher publishes an event each time a local app or service is registered
app.servicePublisher = new app.cote.Publisher({
name: 'feathers services publisher',
namespace: 'services',
key: 'services',
broadcasts: ['application', 'service']
broadcasts: ['service']
}, app.coteOptions)
debug('Services publisher ready for app with uuid ' + app.uuid)
// Also each time a new app pops up so that it does not depend of the initialization order of the apps
app.serviceSubscriber.on('application', applicationDescriptor => {
Object.getOwnPropertyNames(app.services).forEach(path => {
publishService(app, path)
})
// Each time a new app pops up we republish local services so that
// service distribution does not depend on the initialization order of the apps
app.servicePublisher.on('cote:added', (data) => { publishServices(app) })
// FIXME: we should manage apps going offline
app.servicePublisher.on('cote:removed', (data) => { })

// Create the response manager for local services
app.serviceResponder = new app.cote.Responder({
name: 'feathers services responder',
namespace: app.distributionKey,
key: app.distributionKey,
requests: ['find', 'get', 'create', 'update', 'patch', 'remove']
}, app.coteOptions)
debug('Service responder ready for local app with uuid ' + app.uuid)
// Answer requests from other nodes
app.serviceResponder.on('find', async (req) => {
const service = app.service(req.path)
debug('Responding find() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.find(Object.assign({ fromRemote: true }, req.params))
debug('Successfully find() local service on path ' + req.path + ' with key ' + req.key)
return result
})
app.serviceResponder.on('get', async (req) => {
const service = app.service(req.path)
debug('Responding get() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.get(req.id, Object.assign({ fromRemote: true }, req.params))
debug('Successfully get() local service on path ' + req.path + ' with key ' + req.key)
return result
})
app.serviceResponder.on('create', async (req) => {
const service = app.service(req.path)
debug('Responding create() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.create(req.data, Object.assign({ fromRemote: true }, req.params))
debug('Successfully create() local service on path ' + req.path + ' with key ' + req.key)
return result
})
app.serviceResponder.on('update', async (req) => {
const service = app.service(req.path)
debug('Responding update() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.update(req.id, req.data, Object.assign({ fromRemote: true }, req.params))
debug('Successfully update() local service on path ' + req.path + ' with key ' + req.key)
return result
})
app.serviceResponder.on('patch', async (req) => {
const service = app.service(req.path)
debug('Responding patch() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.patch(req.id, req.data, Object.assign({ fromRemote: true }, req.params))
debug('Successfully patch() local service on path ' + req.path + ' with key ' + req.key)
return result
})
app.serviceResponder.on('remove', async (req) => {
const service = app.service(req.path)
debug('Responding remove() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.remove(req.id, Object.assign({ fromRemote: true }, req.params))
debug('Successfully remove() local service on path ' + req.path + ' with key ' + req.key)
return result
})

// Placeholder for request/events managers for remote services
app.serviceRequesters = {}
app.serviceEventsSubscribers = {}

// Dispatcher of service events to other nodes) {
if (app.distributionOptions.publishEvents) {
app.serviceEventsPublisher = new app.cote.Publisher({
name: 'feathers service events publisher',
namespace: app.distributionKey,
key: app.distributionKey,
broadcasts: app.distributionOptions.distributedEvents || ['created', 'updated', 'patched', 'removed']
}, app.coteOptions)
debug('Service events publisher ready for local app with uuid ' + app.uuid + ' and key ' + app.distributionKey)
}

// Tell others apps I'm here
// Add a timeout so that the publisher/subscriber has been initialized on the node
app.applicationPublicationTimeout = setTimeout(_ => { publishApplication(app) }, app.distributionOptions.publicationDelay)
publishServices(app)
}

export function finalize (app) {
debug('Finalizing cote')
Object.getOwnPropertyNames(app.services).forEach(path => {
let service = app.service(path)
if (service) {
if (service.responder) {
service.responder.close()
if (service.responder.serviceEventsPublisher) service.responder.serviceEventsPublisher.close()
}
if (service.serviceEventsSubscriber) service.serviceEventsSubscriber.close()
}
})
if (app.serviceRequesters) Object.getOwnPropertyNames(app.serviceRequesters).forEach(key => app.serviceRequesters[key].close())
if (app.serviceEventsSubscribers) Object.getOwnPropertyNames(app.serviceEventsSubscribers).forEach(key => app.serviceEventsSubscribers[key].close())
if (app.serviceSubscriber) app.serviceSubscriber.close()
if (app.servicePublisher) app.servicePublisher.close()
if (app.serviceResponder) app.serviceResponder.close()
if (app.applicationPublicationTimeout) clearTimeout(app.applicationPublicationTimeout)
if (app.coteInitializationTimeout) clearTimeout(app.coteInitializationTimeout)
}

export default function init (options = {}) {
return function () {
const app = this
// We need to uniquely identify the app to avoid infinite loop by registering our own services
// This uuid is also used a partition key in cote unless provided
app.uuid = uuid()
app.coteOptions = Object.assign({
helloInterval: 10000,
checkInterval: 20000,
Expand All @@ -158,15 +297,15 @@ export default function init (options = {}) {
publicationDelay: (process.env.PUBLICATION_DELAY ? Number(process.env.PUBLICATION_DELAY) : 10000),
coteDelay: (process.env.COTE_DELAY ? Number(process.env.COTE_DELAY) : undefined),
middlewares: {},
publishEvents: true
publishEvents: true,
distributedEvents: ['created', 'updated', 'patched', 'removed']
}, options)

debug('Initializing feathers-distributed with options', app.distributionOptions)
// Change default base/highest port for automated port finding
portfinder.basePort = app.coteOptions.basePort
portfinder.highestPort = app.coteOptions.highestPort
// We need to uniquely identify the app to avoid infinite loop by registering our own services
app.uuid = uuid()

// Setup cote with options and required delay
if (app.distributionOptions.coteDelay) {
// -1 means the caller wants to initialize byitself
Expand Down Expand Up @@ -194,6 +333,5 @@ export default function init (options = {}) {
}

init.RemoteService = RemoteService
init.LocalService = LocalService
init.initialize = initialize
init.finalize = finalize
Loading

0 comments on commit 5220bc9

Please sign in to comment.