Skip to content

Commit

Permalink
SPRXCLT-12: fix PUT behavior when reusing sockets
Browse files Browse the repository at this point in the history
- start streaming PUT body only when:
    - we know we are reusing an open socket
    - a new socket is open
  Note: we won't retry if the socket has been closed by the remote just
  after we have decided to reuse it. We must ensure the remote keepalive
  timeout is greater than ours.
- Ensure once we start streaming, we can never retry
- add _createRequestHeader missing `log` argument
- deduplicate logging logic for readability
  • Loading branch information
Florent Monjalet committed Aug 2, 2023
1 parent 5896c05 commit f67db5c
Showing 1 changed file with 47 additions and 50 deletions.
97 changes: 47 additions & 50 deletions lib/sproxyd.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,79 +287,76 @@ class SproxydClient {
const host = this.getCurrentBootstrap();
const isBatchDelete = key === '.batch_delete';
const newKey = key || keygen(this.cos, params);
const req = this._createRequestHeader(method, headers, newKey, params,
log);
const req = this._createRequestHeader(method, headers, newKey, params, log);
const logInfo = {
component: 'sproxydclient',
method: '_handleRequest',
host,
key: newKey,
contentLength: size,
};

if (stream) {
let isProcessingPUT = false;
let streamingStarted = false;
let voluntaryAbort = false;
headers['content-length'] = size;

const request = _createRequest(req, log, (err, response) => {
if (err) {
if (isProcessingPUT || voluntaryAbort) {
if (streamingStarted || voluntaryAbort) {
err.retryable = false;
} else {
err.retryable = true;
}
if (!voluntaryAbort) {
log.error('putting chunk to sproxyd', { host, key: newKey, error: err });
logInfo.error = err;
log.error('putting chunk to sproxyd', logInfo);
}
return callback(err);
}
// We return the key
log.debug('stored to sproxyd', {
host,
key: newKey,
statusCode: response.statusCode,
});
logInfo.statusCode = response.statusCode;
log.debug('stored to sproxyd', logInfo);
return callback(null, newKey);
});

request.on('finish', () => {
isProcessingPUT = false;
log.debug('finished sending PUT chunks to sproxyd', {
component: 'sproxydclient',
method: '_handleRequest',
contentLength: size,
});
});

stream.on('error', err => {
isProcessingPUT = false;
log.error('error from readable stream', {
error: err.message,
method: '_handleRequest',
component: 'sproxydclient',

const startPayloadStreaming = () => {
streamingStarted = true;
stream.pipe(request);
finished(stream, err => {
if (err) {
log.trace('readable stream aborted', logInfo);
request.abort();
voluntaryAbort = true;
} else {
log.trace('readable stream finished normally', logInfo);
}
});
};

// Only start streaming when connection is guaranteed: either on a
// new connection or a reused one.
// There is a possible race that makes
request.on('socket', (socket) => {
// We can start streaming when reusing a socket
if (request.reusedSocket) {
log.trace('reusing existing socket', logInfo);
startPayloadStreaming();
} else {
// Otherwise, wait for a successful connection
socket.on('connect', () => {
log.trace('using a new socket', logInfo);
startPayloadStreaming();
});
}
});

request.on('socket', socket => {
socket.on('connect', () => {
log.debug('Connection established', {
method: '_handleRequest',
component: 'sproxydclient',
});
isProcessingPUT = true;
stream.pipe(request);
});
request.on('finish', () => {
log.debug('finished sending PUT chunks to sproxyd', logInfo);
});

finished(stream, err => {
if (err) {
log.trace('readable stream aborted', {
method: '_handleRequest',
component: 'sproxydclient',
});
request.abort();
isProcessingPUT = false;
voluntaryAbort = true;
} else {
log.trace('readable stream finished normally', {
method: '_handleRequest',
component: 'sproxydclient',
});
}
stream.on('error', err => {
log.error('error from readable stream', logInfo);
});
} else {
headers['content-length'] = isBatchDelete ? size : 0;
Expand Down

0 comments on commit f67db5c

Please sign in to comment.