Skip to content

Commit

Permalink
v6.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
rjrodger committed Aug 1, 2023
1 parent 75dea9b commit 80323be
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 14 deletions.
126 changes: 126 additions & 0 deletions bin/protocol-aws.js
@@ -0,0 +1,126 @@
const { Duplex } = require('stream')
const { LambdaClient, InvokeCommand } = require('@aws-sdk/client-lambda')

// seneca-repl aws://lambda/my-function-name?region=eu-west-1

class LambdaInvokeStream extends Duplex {
constructor(def, options) {
super(options)

let region = def.region || 'us-east-1'
let name = def.name

this.lambdaClient = new LambdaClient({ region })
this.lambdaFunctionName = name
this.buffer = []
this.processing = false
}

_write(chunk, encoding, done) {
// console.log('write', chunk)
this.processing = true
const cmd = chunk.toString()

if (!cmd.endsWith('\n')) {
cmd += '\n'
}

const params = {
FunctionName: this.lambdaFunctionName,
Payload: JSON.stringify({
body: {
sys: 'repl',
send: 'cmd',
id: 'invoke',
cmd,
},
}),
}

// console.log('params',params)

const command = new InvokeCommand(params)

this.lambdaClient.send(command).then(
(data) => {
// console.log('data', data)
if (200 === data.StatusCode) {
let json = Buffer.from(data.Payload).toString()
// console.log('json', json)
const res = JSON.parse(json)
// console.log('res', res)
const body = JSON.parse(res.body)
// console.log('body', body)

this.buffer.push(body.out + String.fromCharCode(0))
} else {
this.buffer.push(
'# ERROR: ' + JSON.stringify(data) + String.fromCharCode(0),
)
}

this.processing = false
this._read()
done()
},
(err) => {
// console.log('err', err)
this.buffer.push(
`# ERROR invoking Lambda function: ${err}` + String.fromCharCode(0),
)
this.processing = false
this._read()
done()
},
)
}

_read(size) {
// console.log('read', this.processing, this.buffer)

if (this.processing) {
return
}

let chunk
while ((chunk = this.buffer.shift())) {
if (!this.push(chunk)) {
break
}
}

// if (this.buffer.length === 0) {
// this.push(null)
// }
}
}

module.exports = function makeProtocol(spec) {
// console.log('MP AWS', spec)

let duplex = null

const service = spec.url.hostname
if ('lambda' === service) {
const name = spec.url.pathname.substring(1).split('/')[0]

const region = spec.url.searchParams.get('region')

duplex = new LambdaInvokeStream({
name,
region,
// region: 'eu-west-1'
})

setImmediate(() => {
duplex.emit('connect')
})

// console.log(duplex)
} else {
throw new Error('Unknown AWS service: ' + service)
}

// console.log('PAWS', !!duplex)
return duplex
}
33 changes: 22 additions & 11 deletions bin/seneca-repl-exec.js
Expand Up @@ -92,17 +92,12 @@ class RequestStream extends Duplex {
super(options)
this.spec = spec
this.buffer = []
// console.log('HTTP CTOR')
this.processing = false
}

_write(chunk, encoding, callback) {
this.processing = true
const cmd = chunk.toString().trim()
// console.log('HTTP WRITE', cmd)

// this.buffer.push('FOO'+String.fromCharCode(0))
// this._read()
// return callback()

const url = this.spec.url

// Determine whether to use http or https based on the URL
Expand Down Expand Up @@ -135,6 +130,7 @@ class RequestStream extends Duplex {
// console.log('HE', data, res)

this.buffer.push(res.out + String.fromCharCode(0))
this.processing = false
this._read()
callback()
})
Expand All @@ -152,13 +148,20 @@ class RequestStream extends Duplex {
}

_read(size) {
// console.log('H READ')
if (this.processing) {
return
}

let chunk
while ((chunk = this.buffer.shift())) {
if (!this.push(chunk)) {
break
}
}

if (this.buffer.length === 0) {
this.push(null)
}
}
}

Expand Down Expand Up @@ -334,9 +337,17 @@ function connect(spec) {
} else if ('http:' === protocol || 'https:' === protocol) {
duplex = makeHttpDuplex(spec)
} else {
throw new Error(
'unknown protocol: ' + protocol + ' for url: ' + spec.url.href,
)
try {
const makeProtocol = require('./protocol-' +
protocol.replace(/[^a-z0-9-_]/g, '') +
'.js')
return makeProtocol(spec)
} catch (e) {
// console.log(e)
throw new Error(
'unknown protocol: ' + protocol + ' for url: ' + spec.url.href,
)
}
}

return duplex
Expand Down
1 change: 1 addition & 0 deletions dist/repl.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dist/repl.js.map

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions package.json
@@ -1,7 +1,7 @@
{
"name": "@seneca/repl",
"description": "Provides a client and server REPL for Seneca microservice systems.",
"version": "6.0.0",
"version": "6.1.0",
"main": "dist/repl.js",
"license": "MIT",
"author": "Richard Rodger (https://github.com/rjrodger)",
Expand Down Expand Up @@ -44,8 +44,9 @@
"inks": "^2.0.0"
},
"devDependencies": {
"@seneca/maintain": "^0.1.0",
"@aws-sdk/client-lambda": "^3.379.1",
"@seneca/entity-util": "^1.4.0",
"@seneca/maintain": "^0.1.0",
"jest": "^29.6.2",
"prettier": "^3.0.0",
"seneca": "^3.32.0",
Expand Down
1 change: 1 addition & 0 deletions src/repl.ts
Expand Up @@ -160,6 +160,7 @@ function repl(this: any, options: any) {
})
}

// TODO: fix: append newline id needed, otherwise times out!
function send_cmd(this: any, msg: any, reply: any) {
let seneca = this

Expand Down

0 comments on commit 80323be

Please sign in to comment.