Skip to content

Commit

Permalink
fix: #1189 write prelude in streamifyResponse with string body
Browse files Browse the repository at this point in the history
  • Loading branch information
mdenkinger committed Mar 15, 2024
1 parent 5a6e7ed commit f1401eb
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 43 deletions.
145 changes: 103 additions & 42 deletions packages/core/__tests__/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -661,15 +661,63 @@ test('"onError" middleware should be able to short circuit response', async (t)
})

// streamifyResponse

// mock implementation awslambda.HttpResponseStream
const DELIMITER_LEN = 8
globalThis.awslambda = {
streamifyResponse: (cb) => cb,
HttpResponseStream: {
from: (responseStream, metadata) => {
return responseStream
from: (underlyingStream, prelude) => {
// https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/main/src/HttpResponseStream.js
// Wrap the underlyingStream to ensure _onBeforeFirstWrite is called before the first write operation
const wrapStream = () => {
let isFirstWrite = true
const originalWrite = underlyingStream.write
underlyingStream.write = (...args) => {
if (
isFirstWrite &&
typeof underlyingStream._onBeforeFirstWrite === 'function'
) {
isFirstWrite = false
underlyingStream._onBeforeFirstWrite()
}
return originalWrite.apply(underlyingStream, args)
}

return underlyingStream
}

// Execute _onBeforeFirstWrite before the first write operation
underlyingStream._onBeforeFirstWrite = () => {
const metadataPrelude = JSON.stringify(prelude)
underlyingStream.write(metadataPrelude)
underlyingStream.write(new Uint8Array(DELIMITER_LEN))
}
return wrapStream()
}
}
}

function createResponseStreamMockAndCapture () {
function processChunkResponse (chunkResponse) {
const indexOf = chunkResponse.indexOf(new Uint8Array(DELIMITER_LEN))
const prelude = chunkResponse.slice(0, indexOf)
const content = chunkResponse.slice(indexOf + DELIMITER_LEN * 2 - 1)
return { prelude, content }
}

let chunkResponse = ''
const responseStream = createWritableStream((chunk) => {
chunkResponse += chunk
})
return {
responseStream,
chunkResponse: () => chunkResponse,
prelude: () => processChunkResponse(chunkResponse).prelude,
content: () => processChunkResponse(chunkResponse).content
}
}

test('Should throw with streamifyResponse:true using object', async (t) => {
const input = {}
const handler = middy(
Expand All @@ -692,27 +740,28 @@ test('Should throw with streamifyResponse:true using object', async (t) => {

test('Should return with streamifyResponse:true using body undefined', async (t) => {
const input = ''
const metadata = {
statusCode: 200,
headers: {
'Content-Type': 'plain/text'
}
}
const handler = middy(
(event, context, { signal }) => {
return {
statusCode: 200,
headers: {
'Content-Type': 'plain/text'
}
}
return metadata
},
{
streamifyResponse: true
}
)

let chunkResponse = ''
const responseStream = createWritableStream((chunk) => {
chunkResponse += chunk
})
const { responseStream, prelude, content } =
createResponseStreamMockAndCapture()

const response = await handler(event, responseStream, context)
t.is(response, undefined)
t.is(chunkResponse, input)
t.is(prelude(), JSON.stringify(metadata))
t.is(content(), input)
})

test('Should return with streamifyResponse:true using string', async (t) => {
Expand All @@ -723,13 +772,11 @@ test('Should return with streamifyResponse:true using string', async (t) => {
return input
})

let chunkResponse = ''
const responseStream = createWritableStream((chunk) => {
chunkResponse += chunk
})
const { responseStream, chunkResponse } = createResponseStreamMockAndCapture()

const response = await handler(event, responseStream, context)
t.is(response, undefined)
t.is(chunkResponse, input)
t.is(chunkResponse(), input)
})

test('Should return with streamifyResponse:true using body string', async (t) => {
Expand All @@ -746,13 +793,39 @@ test('Should return with streamifyResponse:true using body string', async (t) =>
}
})

let chunkResponse = ''
const responseStream = createWritableStream((chunk) => {
chunkResponse += chunk
const { responseStream, content } = createResponseStreamMockAndCapture()
const response = await handler(event, responseStream, context)
t.is(response, undefined)
t.is(content(), input)
})

test('Should return with streamifyResponse:true using empty body string and prelude', async (t) => {
const input = ''
const metadata = {
statusCode: 301,
headers: {
'Content-Type': 'plain/text',
Location: 'https://example.com'
}
}

const handler = middy({
streamifyResponse: true
}).handler((event, context, { signal }) => {
return {
...metadata,
body: input
}
})

const { responseStream, prelude, content } =
createResponseStreamMockAndCapture()

const response = await handler(event, responseStream, context)

t.is(response, undefined)
t.is(chunkResponse, input)
t.is(prelude(), JSON.stringify(metadata))
t.is(content(), input)
})

test('Should return with streamifyResponse:true using ReadableStream', async (t) => {
Expand All @@ -766,13 +839,10 @@ test('Should return with streamifyResponse:true using ReadableStream', async (t)
}
)

let chunkResponse = ''
const responseStream = createWritableStream((chunk) => {
chunkResponse += chunk
})
const { responseStream, chunkResponse } = createResponseStreamMockAndCapture()
const response = await handler(event, responseStream, context)
t.is(response, undefined)
t.is(chunkResponse, input)
t.is(chunkResponse(), input)
})

test('Should return with streamifyResponse:true using body ReadableStream', async (t) => {
Expand All @@ -792,13 +862,10 @@ test('Should return with streamifyResponse:true using body ReadableStream', asyn
}
)

let chunkResponse = ''
const responseStream = createWritableStream((chunk) => {
chunkResponse += chunk
})
const { responseStream, content } = createResponseStreamMockAndCapture()
const response = await handler(event, responseStream, context)
t.is(response, undefined)
t.is(chunkResponse, input)
t.is(content(), input)
})

test('Should return with streamifyResponse:true using ReadableStream.pipe(...)', async (t) => {
Expand All @@ -812,13 +879,10 @@ test('Should return with streamifyResponse:true using ReadableStream.pipe(...)',
}
)

let chunkResponse = ''
const responseStream = createWritableStream((chunk) => {
chunkResponse += chunk
})
const { responseStream, chunkResponse } = createResponseStreamMockAndCapture()
const response = await handler(event, responseStream, context)
t.is(response, undefined)
t.is(chunkResponse, input)
t.is(chunkResponse(), input)
})

test('Should return with streamifyResponse:true using body ReadableStream.pipe(...)', async (t) => {
Expand All @@ -838,13 +902,10 @@ test('Should return with streamifyResponse:true using body ReadableStream.pipe(.
}
)

let chunkResponse = ''
const responseStream = createWritableStream((chunk) => {
chunkResponse += chunk
})
const { responseStream, content } = createResponseStreamMockAndCapture()
const response = await handler(event, responseStream, context)
t.is(response, undefined)
t.is(chunkResponse, input)
t.is(content(), input)
})

// Plugin
Expand Down
2 changes: 1 addition & 1 deletion packages/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ const middy = (lambdaHandler = defaultLambdaHandler, plugin = {}) => {
const size = 16384 // 16 * 1024 // Node.js default
let position = 0
const length = input.length
while (position < length) {
while (position <= length) {
yield input.substring(position, position + size)
position += size
}
Expand Down

0 comments on commit f1401eb

Please sign in to comment.