Skip to content

Commit

Permalink
Merge pull request ql-io#327 from prabhakhar/master
Browse files Browse the repository at this point in the history
Gzip and Deflate content encoding support for upstream responses
  • Loading branch information
shimonchayim committed Mar 7, 2012
2 parents 95d0ba8 + 6217260 commit 8644141
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
@@ -1,3 +1,7 @@
## Mar 06, 2012

* Gzip and Deflate content encoding support for upstream responses.

## Mar 05, 2012

* Recover shutdown/stop from extraneous pid files.
Expand Down
58 changes: 55 additions & 3 deletions modules/engine/lib/engine/http/request.js
Expand Up @@ -23,6 +23,7 @@ var _ = require('underscore'),
https = require('https'),
URI = require('uri'),
response = require('./response.js'),
zlib = require('zlib'),
uuid = require('node-uuid');

exports.send = function(args) {
Expand Down Expand Up @@ -113,9 +114,55 @@ function sendMessage(args, client, options, retry) {
clientRequest = client.request(options, function(res) {
var bufs = []; // array for bufs for each chunk
var responseLength = 0;
var contentEncoding = res.headers['content-encoding'];
var zipped = false, unzip;
if (contentEncoding) {
contentEncoding = contentEncoding.toLowerCase();
if (contentEncoding === 'gzip') {
unzip = zlib.createGunzip();
}
else if (contentEncoding === 'deflate') {
unzip = zlib.createInflate();
}
else {
var err = new Error('Content-Encoding \'' + contentEncoding + '\' is not supported');
err.uri = args.uri;
err.status = 502;
args.logEmitter.emitError(args.httpReqTx.event, 'Error with uri - ' + args.uri + ' - ' +
'Content encoding ' + contentEncoding + ' is not supported' +
' ' + (Date.now() - start) + 'msec');
res.socket.destroy();
return args.httpReqTx.cb(err);
}
zipped = true;

unzip.on('data', function (chunk) {
bufs.push(chunk);
});
unzip.on('end', function () {
response.exec(timings, reqStart, args, uniqueId, res, start, bufs, mediaType, options, status);
});
unzip.on('error', function (err) {
var err = new Error('Corrupted stream');
err.uri = args.uri;
err.status = 502;
args.logEmitter.emitError(args.httpReqTx.event, 'Error with uri - ' + args.uri + ' - ' +
'Stream is corrupted' +
' ' + (Date.now() - start) + 'msec');
res.socket.destroy();
return args.httpReqTx.cb(err);
});
}

res.on('data', function (chunk) {
// Chunk is a buf as we don't set any encoding on the response
bufs.push(chunk);
if(zipped) {
// TODO Check for corrupted stream. Empty 'bufs' may indicate invalid stream
unzip.write(chunk);
}
else {
// Chunk is a buf as we don't set any encoding on the response
bufs.push(chunk);
}
responseLength += chunk.length;

var maxResponseLength = getMaxResponseLength(args.config, args.logEmitter);
Expand All @@ -133,7 +180,12 @@ function sendMessage(args, client, options, retry) {
}
});
res.on('end', function() {
response.exec(timings, reqStart, args, uniqueId, res, start, bufs, mediaType, options, status);
if(zipped) {
unzip.end();
}
else {
response.exec(timings, reqStart, args, uniqueId, res, start, bufs, mediaType, options, status);
}
});
});

Expand Down
3 changes: 2 additions & 1 deletion modules/engine/lib/engine/source/verb.js
Expand Up @@ -531,7 +531,8 @@ function send(verb, args, uri, params, holder, callback) {
var headers = {
'connection' : args.settings['connection'] ? args.settings['connection'] : 'keep-alive',
'user-agent' : 'ql.io-engine' + require('../../../package.json').version + '/node.js-' + process.version,
'accept' : _.pluck(args.xformers, 'accept').join(',')
'accept' : _.pluck(args.xformers, 'accept').join(','),
'accept-encoding' : 'gzip, deflate'
};

// Copy headers from the table def
Expand Down
2 changes: 1 addition & 1 deletion modules/engine/package.json
@@ -1,7 +1,7 @@
{
"author": "ql.io",
"name": "ql.io-engine",
"version": "0.4.11",
"version": "0.4.12",
"repository": {
"type": "git",
"url": "https://github.com/ql-io/ql.io"
Expand Down
127 changes: 127 additions & 0 deletions modules/engine/test/gzip-test.js
@@ -0,0 +1,127 @@
/*
* Copyright 2011 eBay Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

"use strict";


var fs = require('fs'),
Engine = require('../lib/engine'),
zlib = require('zlib'),
http = require('http');

module.exports = {
'gzip-test':function (test) {
var server = http.createServer(function (req, res) {
var file = __dirname + '/mock' + req.url;
var readStream = fs.createReadStream(file);
res.writeHead(200, {
'Content-Type':file.indexOf('.xml') >= 0 ? 'application/xml' : 'application/json',
'Content-Encoding':'gzip'
});
readStream.pipe(zlib.createGzip()).pipe(res);
});

server.listen(3000, function () {
// Do the test here.
var engine = new Engine({
});
var script = fs.readFileSync(__dirname + '/mock/finditems.ql', 'UTF-8');
engine.execute(
script,
function (emitter) {
emitter.on('end', function (err, results) {
if (err) {
console.log(err.stack || err);
test.ok(false);
}
else {
test.ok(results.body.length > 0, "Response has no body");
}
test.done();
server.close();
});
});
});
},
'deflate-test':function (test) {
var server = http.createServer(function (req, res) {
var file = __dirname + '/mock' + req.url;
var readStream = fs.createReadStream(file);
res.writeHead(200, {
'Content-Type':file.indexOf('.xml') >= 0 ? 'application/xml' : 'application/json',
'Content-Encoding':'deflate'
});
readStream.pipe(zlib.createDeflate()).pipe(res);
});

server.listen(3000, function () {
// Do the test here.
var engine = new Engine({
});
var script = fs.readFileSync(__dirname + '/mock/finditems.ql', 'UTF-8');
engine.execute(
script,
function (emitter) {
emitter.on('end', function (err, results) {
if (err) {
console.log(err.stack || err);
test.ok(false);
}
else {
test.ok(results.body.length > 0, "Response has no body");
}
test.done();
server.close();
});
});
});
},
'gzip-unsupported-encoding-test':function (test) {
var snappy = 'snappy2';
var server = http.createServer(function (req, res) {
var file = __dirname + '/mock' + req.url;
res.writeHead(200, {
'Content-Type':file.indexOf('.xml') >= 0 ? 'application/xml' : 'application/json',
'Content-Encoding':snappy
});
var buffer = "Hello World";
res.write(buffer);
res.end();
});

server.listen(3000, function () {
var engine = new Engine({
});
var script = fs.readFileSync(__dirname + '/mock/finditems.ql', 'UTF-8');
engine.execute(
script,
function (emitter) {
emitter.on('end', function (err, results) {
if (err) {
test.ok(err.message && err.message.indexOf(snappy) > 0, "Expected message not thrown");
test.ok(err.status == 502);
}
else {
test.ok(false, "Unsupported content encoding error must be thrown");
}
test.done();
server.close();
});
});
});
}
// TODO add test case for corrupted stream
};

0 comments on commit 8644141

Please sign in to comment.