Skip to content

Commit

Permalink
Fix #2 cont.: remove byteLength event & promise
Browse files Browse the repository at this point in the history
1. `byteLength` event is not necessary when you have standard
'finish'/'end'/'close'/'error' events + bytes/truncated getters
2. Pre-setting event handlers (especially on 'error') and
wrapping them into a Promise proved to cause lots of
peculiar bugs and race conditions.
  • Loading branch information
rafasofizada committed Jul 13, 2023
1 parent 5488d13 commit ab66b47
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 337 deletions.
158 changes: 101 additions & 57 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ const pechkin = require('pechkin');
const { parseFormData } = require('pechkin');
```

## [Essential: save to random temp location](./examples/basic-fs-temp.js)
## [Save to file system](./examples/fs.js)
**Files are processed sequentially.**

```js

// Full working example: `examples/basic-fs-temp.js`
// Full working example: `examples/fs.js`

http.createServer(async (req, res) => {
const { fields, files } = await pechkin.parseFormData(req, {
Expand All @@ -69,17 +69,29 @@ http.createServer(async (req, res) => {

const results = [];

for await (const { filename: originalFilename, byteLength, stream, ...file } of files) {
for await (const { filename: originalFilename, stream, ...file } of files) {
const newFilename = `${Math.round(Math.random() * 1000)}-${originalFilename}`;
const dest = path.join(os.tmpdir(), newFilename);

// Pipe the stream to a file
// The stream will start to be consumed after the current block of code
// finishes executing...
stream.pipe(fs.createWriteStream(dest));
/*
`byteSize` resolves only after the entire `file.stream` has been consumed
You should `await byteSize` only AFTER the code that consumes the stream
(e.g. uploading to AWS S3, loading into memory, etc.)
*/
const length = await byteLength;

// ...which allows us to set up event handlers for the stream and wrap
// the whole thing in a Promise, so that we can get the stream's length.
const length = await new Promise((resolve, reject) => {
stream
// `stream` is an instance of Transform, which is a Duplex stream,
// which means you can listen to both 'end' (Readable side)
// and 'finish' (Writable side) events.
.on('end', () => resolve(stream.bytesWritten))
.on('finish', () => resolve(stream.bytesWritten))
// You can either reject the Promise and handle the Promise rejection
// using .catch() or await + try-catch block, or you can directly
// somehow handle the error in the 'error' event handler.
.on('error', reject);
})

results.push({ ...file, dest, originalFilename, newFilename, length});
}
Expand Down Expand Up @@ -108,7 +120,7 @@ http.createServer(async (req, res) => {
});
```

## [Essential: processing files sequentially (get SHA-256 hash)](./examples/sequential.js)
## Processing files sequentially (get SHA-256 hash)

In this example, we iterate over all files sequentially, and process them one by one – the next file is accessed and processed only after the previous file is done.
Processing here will be calculating a SHA-256 hash from the stream.
Expand All @@ -125,7 +137,7 @@ Boilerplate code
*/
const fileHashes = [];

for await (const { stream, field, filename, byteLength, mimeType } of files) {
for await (const { stream, field, filename, mimeType } of files) {
// `Hash` class: https://nodejs.org/api/crypto.html#class-hash
const hash = createHash('sha256');

Expand All @@ -139,24 +151,51 @@ for await (const { stream, field, filename, byteLength, mimeType } of files) {
field,
filename,
mimeType,
// Remember to always `await` the `byteLength` promise AFTER the stream has been consumed!
length: await byteLength,
// Here, we expect the stream to be fully consumed by `for-await-of` loop,
// so there's no need to wait for the 'end'/'finish' events to obtain the correct
// byte length of the stream – bytesWritten already reached its final value.

// This is in contrast with the previous file system example, where we
// need to wait for the 'end'/'finish' events to obtain the correct
// byte length of the stream.
length: stream.bytesWritten,
hash: hash.digest('hex'),
});
}
```

## [Advanced: processing files in batches (upload to AWS S3)](./examples/batch-upload-s3.js)

In this example, we process files in batches of three – the next batch of files is accessed and processed only after the previous batch is done.
Processing here will be uploading the files to AWS S3.
## [Processing files in batches (upload to AWS S3)](./examples/s3.js)

```js
// FULL WORKING EXAMPLE: `examples/batch-upload-s3.js`
// Full working example: `examples/s3.js`

import { createHash } from 'crypto';
// S3 setup and upload utility function

// ... Boilerplate code ...
const { S3Client } = require("@aws-sdk/client-s3");
const { Upload } = require('@aws-sdk/lib-storage');

const s3Client = new S3Client({
credentials: {
accessKeyId: "PROVIDE_YOUR_OWN",
secretAccessKey: "PROVIDE_YOUR_OWN",
},
region: "us-east-1",
});

function uploadFileToS3(key, stream) {
const upload = new Upload({
client: s3Client,
params: {
Key: key,
Bucket: "vrjam-firstbridge",
Body: stream,
}
});

return upload.done();
}

// ...

const { fields, files } = await pechkin.parseFormData(req, {
maxTotalFileFieldCount: Infinity,
Expand All @@ -171,15 +210,23 @@ let i = 0;
for await (const { filename: originalFilename, stream, field } of files) {
const key = `${i}-${originalFilename}`;

batch.push(
uploadFileToS3(key, stream) // Check implementation in
.then(({ Location }) => ({ field, originalFilename, location: Location }))

results.push(
await uploadFileToS3(key, stream)
.then(({ Location }) => ({
field,
originalFilename,
location: Location,
// Here, we expect the stream to be fully consumed by `uploadFileToS3()`,
// so there's no need to wait for the 'end'/'finish' events to obtain the correct
// byte length of the stream – bytesWritten already reached its final value.

// This is in contrast with the example in `examples/fs.js`, where we
// need to wait for the 'end'/'finish' events to obtain the correct
// byte length of the stream.
length: stream.bytesWritten,
}))
);

if (batch.length === 3) {
results.push(await Promise.all(batch));
batch = []; // restart batch
}

i++;
}
Expand Down Expand Up @@ -242,12 +289,12 @@ app.post(
async (req, res) => {
const files = [];

for await (const { stream, field, filename, byteLength } of req.files) {
for await (const { stream, field, filename } of req.files) {
// Process files however you see fit...
// Here, streams are simply skipped
stream.resume();

files.push({ field, filename, length: await byteLength });
files.push({ field, filename });
}

return res.json({ fields: req.body, files });
Expand Down Expand Up @@ -404,7 +451,7 @@ type Files = {
>
> The `file.stream` should always be consumed, otherwise the request parsing will hang, and you might never get access to the next file. If you don't care about a particular file, you can simply do `file.stream.resume()`, but the stream should **always** be consumed.

### (Internal) error handling
### (Internal) Error handling inside `Pechkin::FileIterator``
This section is for those who want to know how errors are handled internally. This is not necessary to use `pechkin`.
Expand All @@ -424,38 +471,35 @@ This section is for those who want to know how errors are handled internally. Th
encoding: string;
mimeType: string;
field: string;
byteLength: Promise<FileByteLengthInfo>;
stream: stream.Readable;
stream: ByteLengthTruncateStream; // See below: "Type: ByteLengthTruncateStream"
}
```
- `filename`: The client-provided filename of the file.
- `encoding`: The encoding of the file. [List of encodings](https://nodejs.org/api/buffer.html#buffers-and-character-encodings) supported by Node.js.
- `mimeType`: The MIME type of the file. If the MIME type is crucial for your application, you should not trust the client-provided `mimeType` value – the client can easily lie about it (e.g. send an `.exe` file with `mimeType: "image/png"`). Instead, you should use a library like [`file-type`](https://github.com/sindresorhus/file-type).
- `field`: The name of the field the file was sent in.
- `byteLength`:
- If `maxFileByteLength` **is exceeded**:
- If `abortOnFileByteLengthLimit === true`: A `Promise` that **rejects** with a `FieldLimitError` error of type `maxFileByteLength`.
- If `abortOnFileByteLengthLimit === false`: A `Promise` that **resolves** with an object of type `FileByteLengthInfo`:
```ts
{
truncated: true;
readBytes: number; // always equal to `maxFileByteLength` for the field
}
```
The file stream will be truncated to `readBytes` bytes, so `readBytes` in this situation always equals the `maxFileByteLength` limit for the field.
- If `maxFileByteLength` is **not** exceeded:
```ts
{
truncated: false;
readBytes: number;
}
```
Where readBytes equals to the actual number of bytes read from the stream.
> 📝 Note: `byteLength` is encoding-agnostic.
- `stream`: The file `Readable` stream. The stream should **always** be consumed, otherwise the request parsing will hang, and you might never get access to the next file. If you don't care about a particular file, you can simply do `file.stream.resume()`, but the stream should **always** be consumed.
> ❗️ **Very important note on `stream`:**
>
> The `file.stream` should always be consumed, otherwise the request parsing will hang, and you might never get access to the next file. If you don't care about a particular file, you can simply do `file.stream.resume()`, but the stream should **always** be consumed.
## Type: `ByteLengthTruncateStream`
A [`Transform`](https://nodejs.org/api/stream.html#stream_class_stream_transform) stream, which does the following to source streams piped into it:
- Does nothing, i.e. acts as a `PassThrough` stream, as long as the source stream hasn't reached `maxFileByteLength` limit bytes.
- As soon as the source stream reaches `maxFileByteLength` limit bytes:
- Sets the `truncated` property to `true`
- Throws if `abortOnFileByteLimit = true`
- Truncates the file if `abortOnFileByteLimit = false`
```ts
Transform & {
bytesRead: number;
bytesWritten: number;
truncated: boolean;
}
```
- `bytesRead`: The number of bytes read from the source stream.
- `bytesWritten`: The number of bytes written to the destination stream.
- `truncated`: Whether the file was truncated or not. Truncation only happens with `abortOnFileByteLimit = false`. `bytesRead - bytesWritten` is the number of bytes truncated, and is larger than `0` only if `truncated = true`, and `0` if `truncated = false`.
All of the above properties are updated in real time, as the stream is consumed. This means that you have to wait until the stream is fully consumed (i.e. `'finish'`/`'end'` events are emitted, after e.g. an upload to file system or S3) to get the final values of `bytesRead`, `bytesWritten` and `truncated`.
4 changes: 2 additions & 2 deletions examples/express.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ app.post(
async (req, res) => {
const files = [];

for await (const { stream, field, filename, byteLength } of req.files) {
for await (const { stream, field, filename } of req.files) {
// Process files however you see fit...
// Here, streams are simply skipped
stream.resume();

files.push({ field, filename, length: await byteLength });
files.push({ field, filename });
}

return res.json({ fields: req.body, files });
Expand Down
33 changes: 24 additions & 9 deletions examples/basic-fs-temp.js → examples/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ const http = require('http');
const os = require('os');
const path = require('path');

// If 'Pechkin' is installed, you can simply "require('pechkin')"
// or import * as pechkin from 'pechkin';
// If 'pechkin' is installed as an NPM package,
// you can simply `const pechkin = require('pechkin')`
// or `import * as pechkin from 'pechkin';`

// Use the dist/esm distribution if you're using ESM modules (import)
const pechkin = require('../dist/cjs');

http
Expand All @@ -18,17 +21,29 @@ http

const results = [];

for await (const { filename: originalFilename, byteLength, stream, ...file } of files) {
for await (const { filename: originalFilename, stream, ...file } of files) {
const newFilename = `${Math.round(Math.random() * 1000)}-${originalFilename}`;
const dest = path.join(os.tmpdir(), newFilename);

// Pipe the stream to a file
// The stream will start to be consumed after the current block of code
// finishes executing...
stream.pipe(fs.createWriteStream(dest));
/*
`byteSize` resolves only after the entire `file.stream` has been consumed
You should `await byteSize` only AFTER the code that consumes the stream
(e.g. uploading to AWS S3, loading into memory, etc.)
*/
const length = await byteLength;

// ...which allows us to set up event handlers for the stream and wrap
// the whole thing in a Promise, so that we can get the stream's length.
const length = await new Promise((resolve, reject) => {
stream
// `stream` is an instance of Transform, which is a Duplex stream,
// which means you can listen to both 'end' (Readable side)
// and 'finish' (Writable side) events.
.on('end', () => resolve(stream.bytesWritten))
.on('finish', () => resolve(stream.bytesWritten))
// You can either reject the Promise and handle the Promise rejection
// using .catch() or await + try-catch block, or you can directly
// somehow handle the error in the 'error' event handler.
.on('error', reject);
})

results.push({ ...file, dest, originalFilename, newFilename, length});
}
Expand Down
30 changes: 20 additions & 10 deletions examples/batch-upload-s3.js → examples/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ const http = require("http");
const { S3Client } = require("@aws-sdk/client-s3");
const { Upload } = require('@aws-sdk/lib-storage');

// If 'Pechkin' is installed, you can simply "require('pechkin')"
// or import * as pechkin from 'pechkin';
// If 'pechkin' is installed as an NPM package,
// you can simply `const pechkin = require('pechkin')`
// or `import * as pechkin from 'pechkin';`

// Use the dist/esm distribution if you're using ESM modules (import)
const pechkin = require("../dist/cjs");

const s3Client = new S3Client({
Expand Down Expand Up @@ -44,15 +47,22 @@ http
for await (const { filename: originalFilename, stream, field } of files) {
const key = `${i}-${originalFilename}`;

batch.push(
uploadFileToS3(key, stream)
.then(({ Location }) => ({ field, originalFilename, location: Location }))
);
results.push(
await uploadFileToS3(key, stream)
.then(({ Location }) => ({
field,
originalFilename,
location: Location,
// Here, we expect the stream to be fully consumed by `uploadFileToS3()`,
// so there's no need to wait for the 'end'/'finish' events to obtain the correct
// byte length of the stream – bytesWritten already reached its final value.

if (batch.length === 3) {
results.push(await Promise.all(batch));
batch = []; // restart batch
}
// This is in contrast with the example in `examples/fs.js`, where we
// need to wait for the 'end'/'finish' events to obtain the correct
// byte length of the stream.
length: stream.bytesWritten,
}))
);

i++;
}
Expand Down

0 comments on commit ab66b47

Please sign in to comment.