Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Commit

Permalink
Encoder NodeJS stream support
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrec committed Sep 11, 2013
1 parent 1a6aae9 commit 9f3f103
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 59 deletions.
3 changes: 2 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
* LZ4 stream dictionary
* NodeJS streams updates
* NodeJS streams updates
* dependent blocks compression
65 changes: 45 additions & 20 deletions lib/encoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ function LZ4Stream (options) {
this.options = o

this.compress = o.highCompression ? lz4_binding.compressHCLimited : lz4_binding.compressLimited
this.chunkBound = lz4_binding.compressBound(o.blockMaxSize)

// Build the stream descriptor from the options
// flags
Expand Down Expand Up @@ -66,18 +65,23 @@ function LZ4Stream (options) {
}

// Add data to the stream, splitting blocks according to blockMaxSize
LZ4Stream.prototype.push = function (data) {
LZ4Stream.prototype.add = function (data) {
if (!data) return

for (var size, i = 0, n = data.length; i < n; i += size) {
size = Math.min(n - i, this.options.blockMaxSize)
this._push( data.slice(i, i + size) )
this._add( data.slice(i, i + size) )
}
}

// Shift a block
LZ4Stream.prototype.shiftBlock = function () {
return this.blocks.shift()
}

// Compress and add a data block to the stream
// The block is uncompressed if it is bigger than blockMaxSize
LZ4Stream.prototype._push = function (data) {
LZ4Stream.prototype._add = function (data) {
if (!data) return

var compressed = new Buffer( data.length )
Expand Down Expand Up @@ -114,10 +118,14 @@ LZ4Stream.prototype._push = function (data) {
this.size += data.length
}

LZ4Stream.prototype.flush = function () {
var res = [ lz4_static.MAGICNUMBER_BUFFER ]
// Allocate maximum descriptor size...
var descriptor = new Buffer(15)
LZ4Stream.prototype.header = function () {
// Allocate magic number + maximum descriptor size
var magicSize = 4
var res = new Buffer(magicSize + 15)

res.writeUInt32LE(lz4_static.MAGICNUMBER, 0, false)

var descriptor = res.slice(magicSize)
var descriptorLength = 3

// Update the stream descriptor
Expand All @@ -143,22 +151,37 @@ LZ4Stream.prototype.flush = function () {
, descriptorLength - 1, false
)

// ...then slice it accordingly
// Adjust size according to descriptor length
if (descriptorLength < descriptor.length)
descriptor = descriptor.slice(0, descriptorLength)

res.push(descriptor)
res = res.slice(0, magicSize + descriptorLength)

// Add compressed blocks
res.push.apply(res, this.blocks)
return res
}

res.push(lz4_static.EOS_BUFFER)
LZ4Stream.prototype.tail = function () {
var eosSize = 4

if (this.options.streamChecksum) {
var checksum = new Buffer(4)
checksum.writeUInt32LE( utils.streamChecksum(null, this.checksum), 0, false )
res.push( checksum )
var res = new Buffer(eosSize + 4)
res.writeUInt32LE( utils.streamChecksum(null, this.checksum), eosSize, false )
} else {
var res = new Buffer(eosSize)
}

res.writeUInt32LE(lz4_static.EOS, 0, false)

return res
}

LZ4Stream.prototype.done = function () {
var res = []

res.push( this.header() )

// Add compressed blocks
res.push.apply(res, this.blocks)

res.push( this.tail() )

return Buffer.concat(res)
}
Expand All @@ -172,6 +195,8 @@ function integerBytesLength (i) {

exports.LZ4_compress = function (input, options) {
var LZ4S = new LZ4Stream(options)
LZ4S.push(input)
return LZ4S.flush()
LZ4S.add(input)
return LZ4S.done()
}

exports.LZ4Stream = LZ4Stream
86 changes: 48 additions & 38 deletions lib/encoder_stream.js
Original file line number Diff line number Diff line change
@@ -1,76 +1,86 @@
var Transform = require('stream').Transform
var inherits = require('util').inherits

var encoder = require('./encoder')
var LZ4Stream = require('./encoder').LZ4Stream
var lz4_static = require('./static')

var LZ4_compressChunk = encoder.LZ4_compressChunk
var LZ4_compressHCChunk = encoder.LZ4_compressHCChunk
var LZ4_compressBound = encoder.LZ4_compressBound

/**
Build up chunks and encode them one by one as they are assembled
Build up blocks and encode them one by one as they are assembled
*/
function Encoder (options) {
if ( !(this instanceof Encoder) )
return new Encoder(options)

Transform.call(this, options)
// Options
options = options || {}
this.chunkSize = options.chunkSize || lz4_static.DEFAULT_CHUNKSIZE
this.compress = options.hc ? LZ4_compressHCChunk : LZ4_compressChunk
LZ4Stream.call(this, options)

// Data being processed
this.buffer = []
this.length = 0

this.first = true
this.chunkBound = LZ4_compressBound(this.chunkSize) + 4
this.fast = !this.options.streamSize
}
inherits(Encoder, Transform)
Object.keys(LZ4Stream.prototype).forEach(function(method) {
if (!Encoder.prototype[method])
Encoder.prototype[method] = LZ4Stream.prototype[method];
});

Encoder.prototype._transform = function (data, encoding, done) {
// Buffer the incoming data
this.buffer.push(data)
this.length += data.length

if (this.first) {
var buf = new Buffer(4)
buf.writeUInt32LE(lz4_static.ARCHIVE_MAGICNUMBER, 0, false)
this.push(buf)
this.first = false
}
var blockMaxSize = this.options.blockMaxSize

if ( this.length < this.chunkSize ) return done()

this._compressChunk(this.chunkSize, done)
}
// Not enough data for a block
if ( this.length < blockMaxSize ) return done()

Encoder.prototype._flush = function (done) {
if (this.length === 0) return done()
// Build the data to be compressed
var buf = Buffer.concat(this.buffer, this.length)

while ( this.length > this.chunkSize ) {
this._compressChunk(this.chunkSize)
}
// Compress the block
this.add( buf.slice(0, blockMaxSize) )

this._compressChunk(this.length, done)
}
// Set the remaining data
if (buf.length > blockMaxSize) {
this.buffer = []
this.length = 0
} else {
this.buffer = [ buf.slice(blockMaxSize) ]
this.length = buf.length - blockMaxSize
}

Encoder.prototype._compressChunk = function (size, done) {
var buf = new Buffer(this.chunkBound)
var input = Buffer.concat(this.buffer, this.length)
var res = this.compress( input.slice(0, size), buf.slice(4) )
if (res === 0)
return done( null, new Error('Compression error') )
if (this.fast) {
if (this.first) {
this.push( this.header() )
this.first = false
}
this.push( this.shiftBlock() )
}

buf.writeUInt32LE(res, 0, false)
this.push( buf.slice(0, res + 4) )
done()
}

this.length = input.length - size
this.buffer = this.length > 0 ? [ input.slice(size) ] : []
Encoder.prototype._flush = function (done) {
if (this.length > 0)
this.add( Buffer.concat(this.buffer, this.length) )

if (this.fast) {
if (this.first) {
this.push( this.header() )
this.first = false
}
var block
while ( block = this.shiftBlock() ) this.push(block)
this.push( this.tail() )
} else {
this.push( this.done() )
}

if (done) done()
done()
}

module.exports = Encoder

0 comments on commit 9f3f103

Please sign in to comment.