Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement COPY table TO/FROM STDOUT/STDIN #170

Closed
uasan opened this issue Mar 31, 2021 · 23 comments · Fixed by #211
Closed

Implement COPY table TO/FROM STDOUT/STDIN #170

uasan opened this issue Mar 31, 2021 · 23 comments · Fixed by #211
Labels
enhancement New feature or request

Comments

@uasan
Copy link

uasan commented Mar 31, 2021

Hello.
We using node-pg-copy-streams
https://github.com/brianc/node-pg-copy-streams

COPY table FROM STDIN - for import large csv file to table.
COPY table TO STDOUT - for export from large table to csv file.

We, and I think many, cannot use COPY table FROM/TO '/local/file.csv' the option for the Postgres server to read the file from the local file system, because the web server (which upload or download) does not have the permissions to read and write to the file on the server database.

This is the only reason why we cannot use your library on our project.
Do you think the implementation of this functionality is possible in the foreseeable future?
Thanks.

@porsager
Copy link
Owner

porsager commented Apr 6, 2021

Hi @uasan .

That's a great idea!

I actually have a use case for this myself currently, so let's see if we can get that added.

What do you think about the following API ?

const readableStream = sql`copy table_name to stdout`.readable()

const writableStream = sql`copy table_name from stdin`.writable()

@porsager porsager added the enhancement New feature or request label Apr 6, 2021
@uasan
Copy link
Author

uasan commented Apr 6, 2021

What do you think about the following API ?

If methods readable and writable, return native JS streams, then this interface suits us perfectly.

@porsager
Copy link
Owner

porsager commented Apr 6, 2021

That was the idea 👍

@uasan
Copy link
Author

uasan commented Jun 5, 2021

I would like to somehow help speed up the implementation of export import streaming.
Can you tell me which internal API should be used, I would try to implement the PR, is this an option?

@porsager
Copy link
Owner

porsager commented Jun 25, 2021

Would you mind giving the https://github.com/porsager/postgres/tree/copy branch a test? It's based off the raw branch, so you should be able to test everything you need.

I've just used it on some of my own systems with success (copy the maxmind geolite ip dataset). I got exactly the same performance as letting PostgreSQL copy the files from disk itself, so there is no noticeable overhead in running through node (of course this is local, so network overhead will do a little).

@uasan
Copy link
Author

uasan commented Jun 25, 2021

I will definitely check, thanks!

@uasan
Copy link
Author

uasan commented Jun 25, 2021

A very quick test, then I will do a more detailed one.
If execute query

const sql = postgres(...);
const writable = sql`COPY table FROM STDIN WITH(FORMAT csv, HEADER true, DELIMITER ',')`.writable();
writable.write(....'\n');

Error:

Cannot set property 'blocked' of undefined

@porsager
Copy link
Owner

Ah, got it. I'm working on an improvement on query queueing for this, but I've pushed a commit that fixes this issue for now. Let me know how it works now ;)

@uasan
Copy link
Author

uasan commented Jun 25, 2021

Everything works fine now!

There is an interesting feature, on the same file and table, the pg-copy-streams module raises many drain events and we wait for the backpressure to complete, your module does not emit drain event and everything completes successfully.
I wonder why this is?

@porsager
Copy link
Owner

Right, the exposed stream is not tied to the actual (tcp/socket) connection (yet), so if you check memory, there is probably more buildup than with pg-copy-streams.

@uasan
Copy link
Author

uasan commented Jun 25, 2021

I understand, but it will be a problem if the file size exceeds the amount of available memory, what are your plans for the final implementation?

@porsager
Copy link
Owner

To hook it up to the (tcp/socket) stream (hence the (yet) above.)

@porsager
Copy link
Owner

I'm currently toying around with getting realtime changes through the replication stream, which also uses the Copy parts of the protocol, so the internal design is still a moving target to make sure I end up with a good final design.

@porsager
Copy link
Owner

Ok, backpressure is now handled for both readable and writable. Mind trying to see how it works for you now?

@uasan
Copy link
Author

uasan commented Jun 27, 2021

This is how it is possible without backpressure checks:

const writable = sql`COPY table FROM STDIN `.writable();

await { then(resolve) { writable.write(chank, 'utf8', resolve) } }

It will be great if streams are always promises, please use the Streams Promises API if you can.
https://nodejs.org/api/stream.html#stream_streams_promises_api

@uasan
Copy link
Author

uasan commented Jun 27, 2021

My simple async write stream, is cost-effective for the garbage collector:

  const promisify = stream => {
    let resolve;
    let reject;

    const callback = error => {
      if (error) reject(error);
      else resolve();
    }

    const promise = {
      then(resolveNative, rejectNative) {
        resolve = resolveNative;
        reject = rejectNative;
      }
    };

    stream.writeAsync = (chunk, encoding) => {
      stream.write(chunk, encoding, callback);
      return promise;
    }

   return stream;
  }

Usage:

const stream = promisify(sql`COPY table FROM STDIN`.writable());

while(chunk = getNextChunk()) {
 await stream.writeAsync(chunk);
}

@porsager
Copy link
Owner

Ah, yeah that seems way more convenient, but unless it can work seamlessly with regular streams, I don't want to require node 15 for that functionality yet.

Just to be sure, it works as intended with the latest commit right?

@uasan
Copy link
Author

uasan commented Jun 28, 2021

Just to be sure, it works as intended with the latest commit right?

Yes, thanks for the job.

@uasan
Copy link
Author

uasan commented Jul 18, 2021

Connection hangs, after abort a stream, because of this, the Node.js process in eternal anticipation.

Node.js v16.5

import postgres from 'postgres';
import { createReadStream } from 'fs';
import { addAbortSignal } from 'stream';

const sql = postgres();

const testCopy = async () => {
  const controller = new AbortController();
  const readable = createReadStream(pathFileCSV);

  await sql`TRUNCATE TABLE table_name`;

  const writable = sql`COPY table_name FROM STDIN WITH(FORMAT csv, HEADER true, DELIMITER ',')`.writable();

  readable
    .pipe(addAbortSignal(controller.signal, writable))
    .on('error', () => {
      console.log('ABORTED-STREAM'); // this executed
    });

  setTimeout(async () => {
    controller.abort();
    await sql.end();
    console.log('DB END'); // this never executed
  }, 32)
}

testCopy();

@porsager
Copy link
Owner

porsager commented Aug 13, 2021

Hi @uasan ..

Edit: Ok, if I remove the setTimeout, I see the issue, I'll look closer

@porsager
Copy link
Owner

Should be fixed with 4356ae9

@uasan
Copy link
Author

uasan commented Aug 13, 2021

Ok, when are you planning to merge it into the master?

@porsager
Copy link
Owner

Now 🙂 Then I'll make a new beta release with it.

porsager added a commit that referenced this issue Aug 13, 2021
* Copy to and from stdout and stdin - fixes #170

* Should not discard last byte

* Fix copy when queued

* Handle backpressure for readable and writable

* Fix abort on .writable()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants