Skip to content

Commit

Permalink
move protobuf verify to http
Browse files Browse the repository at this point in the history
  • Loading branch information
viclm committed Apr 25, 2019
1 parent 81b6434 commit 0c21e7b
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 145 deletions.
4 changes: 2 additions & 2 deletions example/gateway/endpoints/ip.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ exports.get = {
prefilter(request) {
return {
service: 'graphloc',
path: 'graphql',
pathname: 'graphql',
method: 'post',
datatype: 'json',
data: {
Expand Down Expand Up @@ -33,7 +33,7 @@ exports.post = {
prefilter(request) {
return {
service: 'graphloc',
path: 'graphql',
pathname: 'graphql',
method: 'post',
datatype: 'json',
data: request.body
Expand Down
2 changes: 1 addition & 1 deletion example/gateway/endpoints/news/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ exports.get = {
prefilter() {
return {
service: 'qq',
path: 'xw/topNews',
pathname: 'xw/topNews',
}
},
convert(result) {
Expand Down
4 changes: 2 additions & 2 deletions example/gateway/endpoints/news/soccer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ exports.get = {
prefilter() {
return {
service: 'hupu',
path: 'home/latest-news',
pathname: 'home/latest-news',
data: {
league: '意甲',
}
Expand All @@ -27,7 +27,7 @@ exports.get = {
return ['英超', '西甲', '德甲', '中超'].map(league => {
return {
service: 'hupu',
path: 'home/latest-news',
pathname: 'home/latest-news',
data: {
league,
}
Expand Down
2 changes: 1 addition & 1 deletion example/gateway/services/hupu/hupu.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module.exports = {
address: 'https://soccer.hupu.com/',
protocol: 'https',
idl: require('path').resolve(__dirname, 'hupu.proto')
protobuf: require('path').resolve(__dirname, 'hupu.proto')
}
2 changes: 1 addition & 1 deletion example/gateway/services/hupu/hupu.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ syntax = "proto3";
package hupu;

service Hupu {
rpc getHomeLatestNews(LNReq) returns (LNRes) {}
rpc HomeLatestNews(LNReq) returns (LNRes) {}
}

message LNReq {
Expand Down
157 changes: 102 additions & 55 deletions protocols/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,76 @@ const zlib = require('zlib')
const url = require('url')
const querystring = require('querystring')
const FormData = require('form-data')
const protobuf = require('protobufjs')
const debug = require('debug')('mgate:http')
const logger = require('../utils/logger')
const path = require('path')

const rhttp = /^https?:\/\//
const rjson = /^application\/json\b/
const rformdata = /^multipart\/form-data\b/
const rhump = /[\/\-_]+(.?)/g
const rnoword = /[\/\-_]/g

const Protobufs = new Map()

function loadProtobuf(filename) {
const lookup = (namespace, type, parentName) => {
let result = []
let fullname = parentName + namespace.name
if (namespace instanceof type) {
result.push([fullname, namespace])
}
if (namespace.hasOwnProperty('nested')) {
result = namespace.nestedArray.reduce((arr, child) => {
return arr.concat(lookup(child, type, fullname + '.'))
}, result)
}
return result
}

const root = protobuf.loadSync(filename)
const namespace = root.nestedArray[0]
const services = lookup(namespace, protobuf.Service, '')
const types = lookup(namespace, protobuf.Type, '').reduce((types, item) => {
types[item[0]] = item[1]
return types
}, {})

const resolved = services.reduce((resolved, item) => {
for (let name in item[1].methods) {
const method = item[1].methods[name]
const requestType = types[namespace.name + '.' + method.requestType]
const responseType = types[namespace.name + '.' + method.responseType]
const verify = {
request(data) {
let err
if (err = requestType.verify(data)) {
throw new Error(`request parameter for service ${namespace.name}.${name}() verification failed: ${err}`)
}
},
response(data) {
let err
if (err = responseType.verify(data)) {
throw new Error(`response result for service ${namespace.name}.${name}() verification failed: ${err}`)
}
}
}
resolved.set(new RegExp(name, 'i'), verify)
}
return resolved
}, new Map())

debug('resolved protos %O', resolved)
return resolved
}

function http(options, callback) {

let protocol = options.protocol || 'http'
let urls = url.parse(options.url)
let method = options.method ? options.method.toLowerCase() : 'get'
let datatype = options.datatype ? options.datatype.toLowerCase() : 'urlencoded'
let timeout = options.timeout
let http2 = options.http2
let headers = {}
let data, formdata

Expand All @@ -27,8 +82,7 @@ function http(options, callback) {
headers: headers,
data: options.data
}
logger.http({ err, req, res })
callback(err, res && res.body)
callback(err, res && res.body, req, res)
}

if (options.headers) {
Expand Down Expand Up @@ -95,19 +149,19 @@ function http(options, callback) {

debug('http request %O', reqOptions)

if (protocol === 'http2') {
const client = require(protocol).connect(url.format({
protocol: urls.protocol,
host: urls.hostname,
port: urls.port
}))
if (http2) {
const client = require('http2').connect(
url.format({
protocol: urls.protocol,
host: urls.hostname,
port: urls.port
})
)

req = client.request({
':path': urls.path
})
req = client.request(Object.assign({ ':method': method.toUpperCase(), ':path': urls.path }, headers))
}
else {
req = require(protocol).request(reqOptions)
req = require(urls.protocol.slice(0, -1)).request(reqOptions)
}

if (options.timeout) {
Expand All @@ -124,7 +178,7 @@ function http(options, callback) {
let timingStop = new Date()
let headers, status

if (protocol === 'http2') {
if (http2) {
headers = response
status = headers[':status']
response = req
Expand Down Expand Up @@ -200,57 +254,50 @@ function http(options, callback) {

}

['http', 'https', 'http2'].forEach(protocol => {
exports[protocol] = function (options) {
return new Promise((resolve, reject) => {
options.protocol = protocol
http(options, (err, result) => {
if (err) {
reject(err)
}
else {
resolve(result)
}
})
})
}
})
exports.http = http

exports.fetch = async function fetch(options) {
options.method = options.method || 'get'
options.path = url.parse(options.path).pathname
options.url = url.resolve(options.service.address, options.path)

const verify = options.service.verify
&& options.service.verify[`${options.method}-${options.path}`.replace(rhump, (s, p) => p.toUpperCase())]
let pathname = url.parse(options.url).pathname.replace(rnoword, '')
let protoFileName = options.protobuf
let protos, verify

if (protoFileName) {
if (Protobufs.get(protoFileName)) {
debug('load protos from cache')
protos = Protobufs.get(protoFileName)
}
else {
debug('load protos from file')
protos = loadProtobuf(protoFileName)
Protobufs.set(protoFileName, protos)
}
}

if (verify) {
const err = verify.request(options.data)
if (err) {
throw new Error(err)
if (protos) {
for (let [r, v] of protos) {
if (r.test(pathname)) {
debug('verify request')
verify = v
verify.request(options.data)
}
}
}

return await new Promise((resolve, reject) => {
options.protocol = options.service.protocol
http(options, (err, result) => {
return new Promise((resolve, reject) => {
http(options, (err, body, req, res) => {
logger.http({ err, req, res })
if (err) {
reject(err)
}
else {
if (verify) {
const err = verify.response(result)
if (err) {
reject(err)
}
else {
resolve(result)
}
}
else {
resolve(result)
}
resolve(body)
}
})
}).then(body => {
if (verify) {
debug('verify response')
verify.response(body)
}
return body
})
}
4 changes: 2 additions & 2 deletions proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class UnresolvedDependencyError extends Error {}

exports.proxy = async function proxy(graph, options) {
debug('proxy start')
debug('graph %o', graph)
debug('graph %O', graph)

const { services, protocols, request = null } = options

Expand Down Expand Up @@ -80,7 +80,7 @@ exports.proxy = async function proxy(graph, options) {
const checkRemains = () => Reflect.ownKeys(resolvedGraph).filter(key => resolvedGraph[key].resolved === undefined)
const remains = checkRemains()

debug('unresolved graph keys %o', remains)
debug('unresolved graph keys %O', remains)
await Promise.all(remains.map(key => resolveField(key, resolvedGraph[key])))

const rs = checkRemains()
Expand Down
61 changes: 13 additions & 48 deletions service.js
Original file line number Diff line number Diff line change
@@ -1,57 +1,15 @@
const protobuf = require('protobufjs')
const url = require('url')
const debug = require('debug')('mgate:service')
const fsp = require('./utils/fsp')
const circuitbreaker = require('./circuitbreaker')

const rhttp = /^http[s2]?$/

function loadProtobuf(path) {
const lookup = (obj, type, parentName) => {
let result = []
let fullname = parentName + obj.name
if (obj instanceof type) {
result.push([fullname, obj])
}
if (obj.hasOwnProperty('nested')) {
result = obj.nestedArray.reduce((arr, child) => {
return arr.concat(lookup(child, type, fullname + '.'))
}, result)
}
return result
}

const root = protobuf.loadSync(path)
const namespace = root.nestedArray[0]
const services = lookup(namespace, protobuf.Service, '')
const types = lookup(namespace, protobuf.Type, '').reduce((obj, item) => {
obj[item[0]] = item[1]
return obj
}, {})

const resolved = services.reduce((obj, item) => {
const [name, service] = item
for (let name in service.methods) {
const requestType = types[namespace.name + '.' + service.methods[name].requestType]
const responseType = types[namespace.name + '.' + service.methods[name].responseType]
obj[name] = {
request: payload => requestType.verify(payload),
response: result => responseType.verify(result),
}
}
return obj
}, {})

return resolved
}

exports.parse = function parse(dir) {
const modules = fsp.findModules(dir)
debug('resolved service module files %O', modules)

const services = modules.reduce((services, { name, module }) => {
if (module.idl && rhttp.test(module.protocol)) {
module.verify = loadProtobuf(module.idl)
}
services[name] = module
return services
}, {})
Expand All @@ -65,12 +23,19 @@ exports.fetch = async function fetch(services, protocols, name, options) {
if (!service) {
throw new Error(`service ${name} isn't registered`)
}
options.service = service

let protocol = protocols[rhttp.test(service.protocol) ? 'http' : service.protocol]

if (!protocol) {
throw new Error(`protocol ${service.protocol} isn't supported`)
let protocol
if (rhttp.test(service.protocol)) {
options.url = url.resolve(service.address, options.pathname)
options.http2 = service.protocol === 'http2'
options.protobuf = service.protobuf
protocol = protocols.http
}
else {
protocol = protocols[service.protocol]
if (!protocol) {
throw new Error(`protocol ${service.protocol} isn't supported`)
}
}

if (service.ratelimiting && !service.ratelimiting.acquire()) {
Expand Down

0 comments on commit 0c21e7b

Please sign in to comment.