Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reply with an asynchroneous response. #283

Merged
merged 2 commits into from Mar 16, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/recorder.js
Expand Up @@ -329,9 +329,9 @@ function record(rec_options) {
};
res.on('data', onData);
}

debug('finished setting up intercepting');

if (proto === 'https') {
options._https_ = true;
}
Expand Down
303 changes: 155 additions & 148 deletions lib/request_overrider.js
Expand Up @@ -262,7 +262,10 @@ function RequestOverrider(req, options, interceptors, remove, cb) {
setRequestHeaders(req, options, interceptor);

if (typeof interceptor.body === 'function') {

// In case we are waiting for a callback
if (interceptor.body.length === 3) {
return interceptor.body(options.path, requestBody || '', continueWithResponseBody);
}
responseBody = interceptor.body(options.path, requestBody) || '';

} else {
Expand Down Expand Up @@ -313,184 +316,188 @@ function RequestOverrider(req, options, interceptors, remove, cb) {

}
}
return continueWithResponseBody(responseBody);

// Transform the response body if it exists (it may not exist if we have `responseBuffers` instead)
if(responseBody) {
debug('transform the response body');

if (!Buffer.isBuffer(responseBody) && !isStream(responseBody)) {
if (typeof responseBody === 'string') {
responseBody = new Buffer(responseBody);
} else {
responseBody = JSON.stringify(responseBody);
function continueWithResponseBody(responseBody) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this line I didn't touch the code just indent it because of the new function definition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the callback should be using the standard callback pattern: error-first, so that it can be easily composed with other I/O.

This allows stuff like:

.reply(function(cb) {
  fs.readFile(file, cb);
});

or even better:

.reply(fs.readFile.bind(fs, file));

Then the answer comes: what happens when there is an error happens? I propose that it's just thrown so that the test stops.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good point, I was thinking about it and then I realize that there should not be errors at that point but you've made your point, I will change that.

// Transform the response body if it exists (it may not exist if we have `responseBuffers` instead)
if (responseBody) {
debug('transform the response body');

if (!Buffer.isBuffer(responseBody) && !isStream(responseBody)) {
if (typeof responseBody === 'string') {
responseBody = new Buffer(responseBody);
} else {
responseBody = JSON.stringify(responseBody);
}
}
}

if (interceptor.delayInMs) {
debug('delaying the response for', interceptor.delayInMs, 'milliseconds');
responseBody = new DelayedBody(interceptor.delayInMs, responseBody);
}
if (interceptor.delayInMs) {
debug('delaying the response for', interceptor.delayInMs, 'milliseconds');
responseBody = new DelayedBody(interceptor.delayInMs, responseBody);
}

if (isStream(responseBody)) {
debug('response body is a stream');
responseBody.pause();
responseBody.on('data', function(d) {
response.emit('data', d);
});
responseBody.on('end', function() {
response.emit('end');
});
responseBody.on('error', function(err) {
response.emit('error', err);
});
} else if (responseBody && !Buffer.isBuffer(responseBody)) {
if (typeof responseBody === 'string') {
responseBody = new Buffer(responseBody);
} else {
responseBody = JSON.stringify(responseBody);
response.headers['content-type'] = 'application/json';
if (isStream(responseBody)) {
debug('response body is a stream');
responseBody.pause();
responseBody.on('data', function(d) {
response.emit('data', d);
});
responseBody.on('end', function() {
response.emit('end');
});
responseBody.on('error', function(err) {
response.emit('error', err);
});
} else if (responseBody && !Buffer.isBuffer(responseBody)) {
if (typeof responseBody === 'string') {
responseBody = new Buffer(responseBody);
} else {
responseBody = JSON.stringify(responseBody);
response.headers['content-type'] = 'application/json';
}
}
}
}

remove(interceptor);
interceptor.discard();
remove(interceptor);
interceptor.discard();

if (aborted) { return; }
if (aborted) { return; }

response.setEncoding = function(newEncoding) {
encoding = newEncoding;
};
response.setEncoding = function(newEncoding) {
encoding = newEncoding;
};

response.pause = function() {
debug('pausing mocking');
paused = true;
if (isStream(responseBody)) {
responseBody.pause();
}
};
response.pause = function() {
debug('pausing mocking');
paused = true;
if (isStream(responseBody)) {
responseBody.pause();
}
};

response.resume = function() {
debug('resuming mocking');
paused = false;
if (isStream(responseBody)) {
responseBody.resume();
}
mockNextEmit();
};

var read = false;
response.read = function() {
debug('reading response body');
if (isStream(responseBody) && responseBody.read) {
return responseBody.read();
} else {
if (! read) {
read = true;
return responseBody;
} else {
return null;
response.resume = function() {
debug('resuming mocking');
paused = false;
if (isStream(responseBody)) {
responseBody.resume();
}
}
};

// HACK: Flag our response object as readable so that it can be automatically
// resumed by Node when drain happens. This enables working with some agents
// that behave differently than built-in agent (e.g. needle, superagent).
// The correct way to implement this would be to use IncomingMessage instead
// of OutgoingMessage class to mock responses.
response.readable = true;

/// response.client.authorized = true
/// fixes https://github.com/pgte/nock/issues/158
response.client = _.extend(response.client || {}, {
authorized: true
});
mockNextEmit();
};

// `mockEmits` is an array of emits to be performed during mocking.
if (typeof responseBody !== "undefined") {
mockEmits.push(function() {
if (encoding) {
debug('transforming body per its encoding');
if (isStream(responseBody)) {
responseBody.setEncoding(encoding);
var read = false;
response.read = function() {
debug('reading response body');
if (isStream(responseBody) && responseBody.read) {
return responseBody.read();
} else {
if (! read) {
read = true;
return responseBody;
} else {
responseBody = responseBody.toString(encoding);
return null;
}
}
if (! isStream(responseBody)) {
debug('emitting response body');
response.emit('data', responseBody);
response.emit('readable');
}
};

// HACK: Flag our response object as readable so that it can be automatically
// resumed by Node when drain happens. This enables working with some agents
// that behave differently than built-in agent (e.g. needle, superagent).
// The correct way to implement this would be to use IncomingMessage instead
// of OutgoingMessage class to mock responses.
response.readable = true;

/// response.client.authorized = true
/// fixes https://github.com/pgte/nock/issues/158
response.client = _.extend(response.client || {}, {
authorized: true
});
} else {
// We will emit response buffers one by one.
_.each(responseBuffers, function(buffer) {

// `mockEmits` is an array of emits to be performed during mocking.
if (typeof responseBody !== "undefined") {
mockEmits.push(function() {
debug('emitting response buffer');
response.emit('data', buffer);
response.emit('readable');
if (encoding) {
debug('transforming body per its encoding');
if (isStream(responseBody)) {
responseBody.setEncoding(encoding);
} else {
responseBody = responseBody.toString(encoding);
}
}
if (! isStream(responseBody)) {
debug('emitting response body');
response.emit('data', responseBody);
response.emit('readable');
}
});
});
}
} else {
// We will emit response buffers one by one.
_.each(responseBuffers, function(buffer) {
mockEmits.push(function() {
debug('emitting response buffer');
response.emit('data', buffer);
response.emit('readable');
});
});
}

if (!isStream(responseBody)) {
mockEmits.push(function() {
debug('emitting end');
response.emit('end');
});
}
if (!isStream(responseBody)) {
mockEmits.push(function() {
debug('emitting end');
response.emit('end');
});
}

function mockNextEmit() {
debug('mocking next emit');

// We don't use `process.nextTick` because we *want* (very much so!)
// for I/O to happen before the next `emit` since we are precisely mocking I/O.
// Otherwise the writing to output pipe may stall invoking pause()
// without ever calling resume() (the observed was behavior on superagent
// with Node v0.10.26)
timers.setImmediate(function() {
if (paused || mockEmits.length === 0 || aborted) {
debug('mocking paused, aborted or finished');
return;
}
function mockNextEmit() {
debug('mocking next emit');

// We don't use `process.nextTick` because we *want* (very much so!)
// for I/O to happen before the next `emit` since we are precisely mocking I/O.
// Otherwise the writing to output pipe may stall invoking pause()
// without ever calling resume() (the observed was behavior on superagent
// with Node v0.10.26)
timers.setImmediate(function() {
if (paused || mockEmits.length === 0 || aborted) {
debug('mocking paused, aborted or finished');
return;
}

var nextMockEmit = mockEmits.shift();
nextMockEmit();
var nextMockEmit = mockEmits.shift();
nextMockEmit();

// Recursively invoke to mock the next emit after the last one was handled.
mockNextEmit();
});
}
// Recursively invoke to mock the next emit after the last one was handled.
mockNextEmit();
});
}

debug('mocking', mockEmits.length, 'emits');
debug('mocking', mockEmits.length, 'emits');

process.nextTick(function() {
var respond = function() {
debug('emitting response');
process.nextTick(function() {
var respond = function() {
debug('emitting response');

if (typeof cb === 'function') {
debug('callback with response');
cb(response);
}
if (typeof cb === 'function') {
debug('callback with response');
cb(response);
}

req.emit('response', response);
req.emit('response', response);

if (isStream(responseBody)) {
debug('resuming response stream');
responseBody.resume();
}
if (isStream(responseBody)) {
debug('resuming response stream');
responseBody.resume();
}

mockNextEmit();
};
mockNextEmit();
};

if (interceptor.delayConnectionInMs && interceptor.delayConnectionInMs > 0) {
setTimeout(respond, interceptor.delayConnectionInMs);
} else {
respond();
}
});
if (interceptor.delayConnectionInMs && interceptor.delayConnectionInMs > 0) {
setTimeout(respond, interceptor.delayConnectionInMs);
} else {
respond();
}
});
}
};

return req;
Expand Down