Skip to content

Commit

Permalink
feat: add aws/lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
willfarrell committed May 5, 2023
1 parent 6ddc0fa commit 3328249
Show file tree
Hide file tree
Showing 39 changed files with 2,454 additions and 1,346 deletions.
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"packages": ["packages/*"],
"useNx": false,
"version": "0.0.29"
"version": "0.0.30"
}
1,170 changes: 727 additions & 443 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@datastream/monorepo",
"version": "0.0.29",
"version": "0.0.30",
"description": "Streams made easy.",
"private": true,
"type": "module",
Expand Down
1 change: 1 addition & 0 deletions packages/aws/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from '@datastream/aws/dynamodb'
export * from '@datastream/aws/lambda'
export * from '@datastream/aws/s3'
export * from '@datastream/aws/sns'
export * from '@datastream/aws/sqs'
50 changes: 50 additions & 0 deletions packages/aws/lambda.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { createReadableStream } from '@datastream/core'
import {
LambdaClient,
InvokeWithResponseStreamCommand
} from '@aws-sdk/client-lambda'

const awsClientDefaults = {
// https://aws.amazon.com/compliance/fips/
useFipsEndpoint: [
'us-east-1',
'us-east-2',
'us-west-1',
'us-west-2',
'ca-central-1'
].includes(process.env.AWS_REGION)
}

let defaultClient = new LambdaClient(awsClientDefaults)
export const awsLambdaSetClient = (lambdaClient) => {
defaultClient = lambdaClient
}

export const awsLambdaReadableStream = async (lambdaOptions, streamOptions) => {
return createReadableStream(awsLambdaGenerator(lambdaOptions), streamOptions)
}
export const awsLambdaResponseStream = awsLambdaReadableStream

async function * awsLambdaGenerator (lambdaOptions, streamOptions) {
if (!Array.isArray(lambdaOptions)) lambdaOptions = [lambdaOptions]
for (const options of lambdaOptions) {
const response = await defaultClient.send(
new InvokeWithResponseStreamCommand(options)
)
for await (const chunk of response.EventStream) {
if (chunk?.PayloadChunk?.Payload) {
yield chunk.PayloadChunk.Payload
} else if (chunk?.InvokeComplete?.ErrorCode) {
throw new Error(chunk.InvokeComplete.ErrorCode, {
cause: chunk.InvokeComplete.ErrorDetails
})
}
}
}
}

export default {
setClient: awsLambdaSetClient,
readableStream: awsLambdaReadableStream,
responseStream: awsLambdaReadableStream
}
50 changes: 50 additions & 0 deletions packages/aws/lambda.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import test from 'node:test'
import { deepEqual } from 'node:assert'
// import sinon from 'sinon'
import { mockClient } from 'aws-sdk-client-mock'
import {
LambdaClient,
InvokeWithResponseStreamCommand
} from '@aws-sdk/client-lambda'

import { createReadableStream } from '@datastream/core'

import { awsLambdaSetClient, awsLambdaReadableStream } from '@datastream/aws'

let variant = 'unknown'
for (const execArgv of process.execArgv) {
const flag = '--conditions='
if (execArgv.includes(flag)) {
variant = execArgv.replace(flag, '')
}
}

test(`${variant}: awsLambdaReadableStream should return chunk`, async (t) => {
const client = mockClient(LambdaClient)
awsLambdaSetClient(client)

const encoder = new TextEncoder()
const decoder = new TextDecoder()

client.on(InvokeWithResponseStreamCommand, {}).resolves({
EventStream: createReadableStream([
{
PayloadChunk: {
Payload: encoder.encode('1')
}
},
{
PayloadChunk: {
Payload: encoder.encode('2')
}
}
])
})

let result = ''
for await (const chunk of await awsLambdaReadableStream({})) {
result += decoder.decode(chunk)
}

deepEqual(result, '12')
})
Loading

0 comments on commit 3328249

Please sign in to comment.