Skip to content

Commit

Permalink
Merge 2a1f310 into 72dc470
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromevalentin committed Jun 3, 2017
2 parents 72dc470 + 2a1f310 commit 80bd283
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 20 deletions.
26 changes: 19 additions & 7 deletions lib/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ exports.listen = function (options, transportUtil) {
var listener
var listenAttempts = 0
var listen_details = _.clone(msg)
var pins = transportUtil.resolve_pins(msg)

server.on('request', function (req, res) {
internals.timeout(listenOptions, req, res)
Expand All @@ -35,7 +36,7 @@ exports.listen = function (options, transportUtil) {
return res.end()
}

internals.trackHeaders(listenOptions, seneca, transportUtil, req, res)
internals.trackHeaders(listenOptions, seneca, transportUtil, req, res, pins)
})
})

Expand Down Expand Up @@ -206,7 +207,7 @@ internals.setBody = function (seneca, transportUtil, req, res, next) {
})
}

internals.trackHeaders = function (listenOptions, seneca, transportUtil, req, res) {
internals.trackHeaders = function (listenOptions, seneca, transportUtil, req, res, pins) {
if (req.url.indexOf(listenOptions.path) !== 0) {
return
}
Expand Down Expand Up @@ -238,6 +239,17 @@ internals.trackHeaders = function (listenOptions, seneca, transportUtil, req, re
}
}

// Issue: #97
if (pins) {
// any topic is not pinned, verifying the calling one is pinned
var argspatrun = transportUtil.make_argspatrun(pins)
if (!argspatrun.find(req.body)) {
data.error = transportUtil.error('not_pinned', { act: req.body })
data.error.statusCode = 404
return internals.sendResponse(seneca, transportUtil, res, data, {})
}
}

transportUtil.handle_request(seneca, data, listenOptions, function (out) {
internals.sendResponse(seneca, transportUtil, res, out, data)
})
Expand All @@ -247,14 +259,14 @@ internals.sendResponse = function (seneca, transportUtil, res, out, data) {
var outJson = 'null'
var httpcode = 200

if (out && out.res) {
httpcode = out.res.statusCode || httpcode
outJson = transportUtil.stringifyJSON(seneca, 'listen-web', out.res)
}
else if (out && out.error) {
if (out && out.error) {
httpcode = out.error.statusCode || 500
outJson = transportUtil.stringifyJSON(seneca, 'listen-web', out.error)
}
else if (out && out.res) {
httpcode = out.res.statusCode || httpcode
outJson = transportUtil.stringifyJSON(seneca, 'listen-web', out.res)
}

var headers = {
'Content-Type': 'application/json',
Expand Down
17 changes: 16 additions & 1 deletion lib/tcp.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ exports.listen = function (options, transportUtil) {
var connections = []
var listenAttempts = 0

var pins = transportUtil.resolve_pins(args)

var listener = Net.createServer(function (connection) {
seneca.log.debug('listen', 'connection', listenOptions,
'remote', connection.remoteAddress, connection.remotePort)
Expand All @@ -41,7 +43,20 @@ exports.listen = function (options, transportUtil) {
return
}

transportUtil.handle_request(seneca, data, options, function (out) {
// Issue: #97
if (pins) {
// any topic is not pinned, verifying the calling one is pinned
var argspatrun = transportUtil.make_argspatrun(pins)
if (!argspatrun.find(data.act)) {
out = transportUtil.prepareResponse(seneca, data)
out.error = transportUtil.error('not_pinned', data)

stringifier.write(out)
return
}
}

transportUtil.handle_request(seneca, data, listenOptions, function (out) {
if (out === null || !out.sync) {
return
}
Expand Down
43 changes: 36 additions & 7 deletions lib/transport-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ var internals = {
'message_loop': 'Inbound message rejected as looping back to this server.',
'data_error': 'Inbound message included an error description.',
'invalid_json': 'Invalid JSON: <%=input%>.',
'unexcepted_async_error': 'Unexcepted error response to asynchronous message.'
'unexcepted_async_error': 'Unexcepted error response to asynchronous message.',
'not_pinned': 'Inbound message <%=act%> rejected as not pinned'
},
override: true
})
Expand Down Expand Up @@ -99,7 +100,6 @@ internals.Utils.prototype.handle_response = function (seneca, data, client_optio
return false
}


var actinfo = {
id: data.id,
accept: data.accept,
Expand Down Expand Up @@ -248,14 +248,34 @@ internals.Utils.prototype.handle_request = function (seneca, data, listen_option

input.id$ = data.id

this.requestAct(seneca, input, output, respond)
this.requestAct(seneca, input, output, respond, listen_options.inward, listen_options.outward)
}

internals.Utils.prototype.requestAct = function (seneca, input, output, respond) {
internals.Utils.prototype.requestAct = function (seneca, input, output, respond, inward, outward) {
var self = this

try {
if (inward) {
inward({ seneca }, { msg: input })
}
seneca.act(input, function (err, out) {
if (outward) {
try {
var outward_data = {
err: err,
msg: input,
res: out
}
outward({ seneca }, outward_data)
err = outward_data.err
out = outward_data.res
}
catch (e) {
// outward failed, keep a trace of the original err if there was one
e.act_error = err
err = e
}
}
self.update_output(input, output, err, out)
respond(output)
})
Expand Down Expand Up @@ -525,16 +545,25 @@ internals.Utils.prototype.handle_entity = function (seneca, raw) {

raw = _.isObject(raw) ? raw : {}

// FIX #135 - entity is now optional in seneca, entity sent remotely may arrive
// in a seneca context that does not support entity.
// Don't try to "make" them as it is not supported and lead to:
// TypeError: seneca.make$ is not a function
function make (entity) {
if (seneca.make$) return seneca.make$(entity)
else seneca.log.warn('[', entity, '] cannot be used as entity. seneca-entity plugin is missing.')
return entity
}

if (raw.entity$) {
return seneca.make$(raw)
return make(raw)
}

_.each(raw, function (value, key) {
if (_.isObject(value) && value.entity$) {
raw[key] = seneca.make$(value)
raw[key] = make(value)
}
})

return raw
}

Expand Down
32 changes: 32 additions & 0 deletions test/entity.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,36 @@ describe('Transporting Entities', function () {
})
})
})

it('supports client that are not using entity (#135)', function (done) {
var server = CreateInstance()

if (server.version >= '2.0.0') {
server.use(Entity)
}

server.ready(function () {
server.add({cmd: 'test'}, function (args, cb) {
let entity = this.make$('test').data$(args.entity)
entity.save$(function (err, entitySaveResponse) {
if (err) return cb(err)
cb(null, entitySaveResponse)
})
})
.add({role: 'entity', cmd: 'save'}, function (args, cb) {
cb(null, { entity: args.ent, tx: args.tx$ })
})
.listen({ type: 'tcp', port: 20105 })

var client = CreateInstance()
client.client({ type: 'tcp', port: 20105 })
client.ready(function () {
this.act({cmd: 'test', entity: { name: 'bar' }}, function (err, res) {
Assert(!err)
Assert(res.entity.name === 'bar')
done()
})
})
})
})
})
101 changes: 100 additions & 1 deletion test/http.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ describe('Specific http', function () {
})
})


it('http-query', function (fin) {
CreateInstance({errhandler: fin})
.add('a:1', function (args, done) {
Expand Down Expand Up @@ -239,6 +238,106 @@ describe('Specific http', function () {
})
})
})

it('apply inward/outward listen options on HTTP remote act call', function (fin) {
CreateInstance()
.add('foo:1', function (args, done) {
done(null, { BAR: args.bar })
})
.listen({type: 'http', port: '18997',
inward: (context, data) => {
data.msg.bar += 1
data.msg.inward = 'INPUT UPGRADED'
},
outward: (context, data) => {
data.res.BAR += 10
data.res.inward = data.msg.inward
data.res.outward = 'OUTPUT UPGRADED'
}
})
.ready(function () {
var siClient = CreateInstance()
.client({type: 'http', port: '18997'})

siClient.act('foo:1,bar:2', function (err, out) {
if (err) return fin(err)
Assert.equal(out.BAR, 13)
Assert.equal(out.inward, 'INPUT UPGRADED')
Assert.equal(out.outward, 'OUTPUT UPGRADED')
fin()
})
})
})

it('reject HTTP remote act call in inward listen option', function (fin) {
CreateInstance()
.add('foo:1', function (args, done) {
done(null, { BAR: args.bar })
})
.listen({type: 'http', port: '18996',
inward: (context, data) => {
var e = new Error('HTTP inward rejected!')
e.error_code = 'inward_rejected'
throw e
}
})
.ready(function () {
var siClient = CreateInstance()
.client({type: 'http', port: '18996'})

siClient.act('foo:1,bar:2', function (err, out) {
Assert.equal(err.error_code, 'inward_rejected')
if (err) return fin()
fin(new Error('Inward does not reject remote call'))
})
})
})

it('reject HTTP remote act call in outward listen option', function (fin) {
CreateInstance()
.add('foo:1', function (args, done) {
done(null, { BAR: args.bar })
})
.listen({type: 'http', port: '18995',
outward: (context, data) => {
var e = new Error('HTTP outward rejected!')
e.error_code = 'outward_rejected'
throw e
}
})
.ready(function () {
var siClient = CreateInstance()
.client({type: 'http', port: '18995'})

siClient.act('foo:1,bar:2', function (err, out) {
Assert.equal(err.error_code, 'outward_rejected')
if (err) return fin()
fin(new Error('Outward does not reject remote call'))
})
})
})

it('catch HTTP remote act call error in outward listen option', function (fin) {
CreateInstance()
.add('foo:1', function (args, done) {
done(new Error('Catchable failure'), {BAR: args.bar})
})
.listen({type: 'http', port: '18994',
outward: (context, data) => {
delete data.err
}
})
.ready(function () {
var siClient = CreateInstance()
.client({type: 'http', port: '18994'})

siClient.act('foo:1,bar:2', function (err, out) {
if (err) return fin(err)
Assert.equal(out.BAR, 2)
fin()
})
})
})
})

describe('Specific https', function () {
Expand Down
Loading

0 comments on commit 80bd283

Please sign in to comment.