Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can finalize be called before entry streams are done? #24

Closed
fresheneesz opened this issue Sep 2, 2014 · 11 comments
Closed

Can finalize be called before entry streams are done? #24

fresheneesz opened this issue Sep 2, 2014 · 11 comments

Comments

@fresheneesz
Copy link

Ideally, I would think I could add as many entries with streams as I want, then call finalize right afterward and everything would work (ie automatically wait for all the input streams to complete before actually creating the package). The documentation seems to imply that this isn't the case tho. Can I or can't i do that? If not, why not? Can we make it so finalize can be called without explicitly waiting for the streams?

@fresheneesz
Copy link
Author

I noticed I even get "already piping an entry" if you try to add an entry before a previous entry's stream is done. This is not how streams should work - if you have to wait for the stream to finish before moving on to the next stream, the usefulness of streams is destroyed.

@mafintosh
Copy link
Owner

You mean so you can do stuff like this?

fs.createReadStream('file-1').pipe(pack.entry({name:'file-1', size:...})
fs.createReadStream('file-2').pipe(pack.entry({name:'file-2', size:...})
pack.finalize()

I'm ok with a PR for that change :)

@fresheneesz
Copy link
Author

@mafintosh Yeah exactly.

I added a failing unit test, and attempted to make it work. Didn't succeed but this is what I have:

fresheneesz@47116da

I thought it wasn't working because the Sink._write method was writing too early, but that doesn't seem to be the case anymore. It does look like file 2 is writing too early, but I have no idea why at this point.

@andrewdeandrade
Copy link
Contributor

@fresheneesz try updating your fork to use this branch of my fork of tar-stream and try again:
https://github.com/malandrew/tar-stream/tree/update-deps

The reason I suggest this is because you're trying to do things similar to what I was doing.

@croqaz
Copy link

croqaz commented Aug 10, 2016

Hey everyone !! Any news about this issue? Adding multiple files within a tar archive would be really useful..

@abrinckm
Copy link

abrinckm commented Nov 25, 2019

@croqaz @mafintosh @fresheneesz

Hi,
I was able to stream multiple entries using the following code and without any modifications to mafintosh's original source code. This will create a .tar.gz file using only streams:

const fs = require('fs');
const zlib = require('zlib');
const tar = require('tar-stream');

function exampleCreateTarGzip (files) {
    const pack = tar.pack();
    const gzipStream = zlib.createGzip({level: zlib.constants.Z_BEST_COMPRESSION});
    
    // Example list of files
    files = [ { name:"./works.csv", size: 3654 }, { name: "./manuscripts.csv", size: 303156 } ];

    return new Promise((resolve, reject) => {
      pack.on('error', reject);
      
      packEntry();
      
      // Callback is recursive
      function packEntry(err) {
        if (err) {
          reject(err);
        } else if (files.length) {
          const file_entry = files.pop();

          //please note the recursive callback here
          const entry = pack.entry({ name: file_entry.name, size: file_entry.size }, packEntry); 

          const readStream = fs.createReadStream(file_entry.name);

          readStream.on('end', entry.end.bind(entry));     // This is key to getting this to work
          readStream.on('error', reject);
          readStream.pipe(entry);
        } else {
          pack.finalize();
        }
      }

      const writeStream = fs.createWriteStream('./archive.tar.gz');
      writeStream.on('error', reject);
      writeStream.on('end', () => { resolve('Done archiving files'); });

      pack.pipe(gzipStream).pipe(writeStream);
    });
}

The key here is to call pack.entry() only after the previous entry has finished streaming and also to know when to explicitly call entry.end()

This is recursive but perhaps can be done differently. Maybe some tweaks are needed to get this to work properly for you (this was roughly copied and modified from my own code), but this solution works just fine for me.

This was referenced Nov 26, 2019
@dominicbartl
Copy link

Hi,

I created a wrapper to add multiple streams sequentially. Independent from the node fs module, since I am using read and write streams from Google Cloud Storage. I'm running in the Firebase Functions environment with 2GB memory and was running into out-of-memory errors when using different packages.

With this I was able to create gzipped archives >2GB and also including single files which are larger than 2GB

import {Readable, Writable} from "stream";
import * as tar from "tar-stream";
import {Pack} from "tar-stream";

type FileInfo = {name: string, size: number, stream: Readable};

export class TarArchive {

	private pack = tar.pack();
	private streamQueue: FileInfo[] = [];
	private size = 0;

	constructor() {
	}

	addBuffer(name: string, buffer: Buffer) {
		this.size += buffer.length;
		this.pack.entry({
			name: name
		}, buffer);
		console.log(`Added ${name}`, buffer.length, this.size);
		return this;
	}

	addStream(name: string, size: number, stream: Readable) {
		this.streamQueue.push({
			name, size, stream
		});
	}

	write(streamCallback: (pack: Pack) => Writable) {
		return new Promise((resolve, reject) => {
			this.nextEntry((err) => {
				if (err) {
					reject(err)
				} else {
					resolve();
				}
			}, this.streamQueue.length);
			streamCallback(this.pack)
				.on('error', (err) => {
					this.pack.destroy(err);
					reject(err);
				})
		});
	}

	private nextEntry(callback: (err?: Error) => void, total: number) {
		const file = this.streamQueue.shift();
		if (file) {
			const writeEntryStream = this.pack.entry({
				name: file.name,
				size: file.size
			}, (err) => {
				if (err) {
					callback(err);
				} else {
					this.size += file.size;
					console.log(`Added ${file.name}`, file.size, this.size, `${total - this.streamQueue.length}/${total}`);
					this.nextEntry(callback, total);
				}
			});
			file.stream.pipe(writeEntryStream);
		} else {
			this.pack.finalize();
			callback();
		}
	}

}

Here's an example on how TarArchive is used:

const tar = new TarArchive();
tar.addBuffer('program.json', programData);

for (const file of files) {
	const meta = await this.fs.lstat(file);
	tar.addStream(file, meta.size, this.fs.createReadStream(file));
}

return tar.write((stream) => {
	return stream
		.pipe(zlib.createGzip({level: zlib.constants.Z_BEST_COMPRESSION}))
		.pipe(output)
});

The fs package I'm using for streams is node-fs-firebase but it should work with any read and write stream.

@cullylarson
Copy link

@abrinckm Thanks so much for the example. That was a lifesaver. Once issue to correct, on this line:

writeStream.on('end', () => { resolve('Done archiving files'); });

Write streams don't have an end event. You would need to listen for finish or close instead. Your example is probably working, but only because you don't have any code after exampleCreateTarGzip. If you do something like this:

await exampleCreateTarGzip(...)
console.log('done')

You would never see the "done" message. Node exits (without a message) for some reason, since it knows the promise will never resolve.

@gvspriyan
Copy link

Yes 'dominicbartl' you are a life saver. The 'buffer' parameter for pack.entry( {...}, buffer ) is what I was not aware of..thanks !!

@vanshamagg
Copy link

vanshamagg commented Aug 8, 2021

Code to pull files from a remote server, start packing and Gzipping them and then use the resulting stream; probably to start streaming the tar to S3 or any other remote storage or just locally maybe

import { eachSeries } from 'async';
import * as tar from 'tar-stream';
import axios from 'axios';
import { IncomingMessage } from 'http';
import { createGzip, constants, createDeflate } from 'zlib';
const tarball = tar.pack();

const uris = [
    'https://img.freepik.com/free-photo/cool-geometric-triangular-figure-neon-laser-light-great-backgrounds-wallpapers_181624-9331.jpg?size=626&ext=jpg',
    'https://img.freepik.com/free-photo/cool-geometric-triangular-figure-neon-laser-light-great-backgrounds-wallpapers_181624-9331.jpg?size=626&ext=jpg',
    'https://img.freepik.com/free-photo/cool-geometric-triangular-figure-neon-laser-light-great-backgrounds-wallpapers_181624-9331.jpg?size=626&ext=jpg',
    'https://img.freepik.com/free-photo/cool-geometric-triangular-figure-neon-laser-light-great-backgrounds-wallpapers_181624-9331.jpg?size=626&ext=jpg',
    'https://img.freepik.com/free-photo/cool-geometric-triangular-figure-neon-laser-light-great-backgrounds-wallpapers_181624-9331.jpg?size=626&ext=jpg',
];

(async () => {
    await eachSeries(
        uris,
        async function (file, cb) {
            console.log(file);

            const { readable, name, size } = await getStream(file);
            const writable = tarball.entry({ name, size });
            readable.pipe(writable);
            await new Promise((res, rej) => {
                writable.on('finish', () => res('ok'));
                writable.on('error', (err) => rej(err));
                readable.on('error', (err) => rej(err));
            });
            return cb();
        }
    );
    tarball.finalize();

})();

// Gets the readable stream for each file + other props
async function getStream(file: string) {
    const readable: IncomingMessage = (await axios.get(file, { responseType: 'stream' })).data;
    const size: number = +readable.headers['content-length'];
    const name = new Date().toString() + '.png'; // assigning a random name to the file
    return { readable, size, name };

}

const gZipCompress = createGzip({ level: constants.Z_BEST_COMPRESSION });

//  use this stream anywhere now
export = tarball.pipe(gZipCompress)

@psi-4ward
Copy link

      const writeStream = fs.createWriteStream('./archive.tar.gz');
      const pack = tar.pack();
      pack.pipe(createGzip()).pipe(writeStream);

      for (const file of files) {
        await new Promise((resolve, reject) => {
          const readStream = fs.createReadStream(file.path);
          const entry = pack.entry({ name: file.name, size: file.size });
          readStream.on('error', reject);
          entry.on('error', reject);
          entry.on('finish',resolve);
          readStream.pipe(entry);
        });
      }
      pack.finalize();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants