Skip to content

Better KVS streaming support #2929

Open
@barjin

Description

@barjin

Which package is the feature request for? If unsure which one to select, leave blank

None

Feature

While the current KVS implementation can work with Node.JS streams (e.g. in the setValue method), the interface leaves a lot to be desired.

See e.g. this FileDownload example - without KVS, the streamHandler can be neatly implemented with one pipeline:

import { pipeline } from 'node:stream/promises';
import { FileDownload } from '@crawlee/http';
import fs from 'node:fs';

function UpperCaseStream() {
    return new TransformStream({
      transform(chunk, controller) {
        controller.enqueue(chunk.toUpperCase());
      },
    });
  }

const crawler = new FileDownload({
    async streamHandler({ stream }) {
        await pipeline(
            stream,
            new TextDecoderStream(),
            UpperCaseStream(),
            fs.createWriteStream('output.txt')
        )
    }
});

await crawler.run(['https://example.com']);

If we want to switch filesystem writes for KVS, though, the API gets in the way:

    const crawler = new FileDownload({
        async streamHandler({ stream, getKeyValueStore }) {
+            const kvs = await getKeyValueStore();

+            const consumer = new Transform({
+               transform(chunk, _, callback) {;
+                    callback(null, chunk);
+                }
+            });

+            await Promise.all([
                pipeline(
                    stream,
                    new TextDecoderStream(),
                    UpperCaseStream(),
+                  consumer
                ),
+              kvs.setValue(
+                    'test', 
+                    consumer,
+                    { contentType: 'text/plain' }
+              )
            ]);           
        }
    });

The interleaving of async/await model and the stream interfaces simply feels unergonomic.

This is mostly because **while kvs.setValue supports streams, it accepts a stream instance as a parameter ** (instead of implementing a Writable that could be piped to or used as a pipeline step).

KVS.getValue currently doesn't seem to support streams at all.

Motivation

Simplify work with large files in the Crawlee / Apify ecosystem.

Ideal solution or implementation, and any additional constraints

Not sure. Perhaps adding methods like

KeyValueStore.getReadableStream(key: string): Readable

and

KeyValueStore.getWritableStream(key: string, options: { contentType: string }): Writable

would be sufficient.

This might require adding the same methods into the client(?) and / or SDK.

Alternative solutions or implementations

For pluggable stream-enabled setValue, see examples above.

For stream-enabled getValue (on Apify), users afaik have to use the API directly using an HTTP client.
I don't currently know any hack for streaming the FS-backed memory-storage KVS records.

Other context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    featureIssues that represent new features or improvements to existing features.t-toolingIssues with this label are in the ownership of the tooling team.

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions