Skip to content

Commit

Permalink
Merged pull request #19~
Browse files Browse the repository at this point in the history
  • Loading branch information
kurokikaze committed Mar 2, 2012
2 parents d29ef4e + fa0acab commit a29d22b
Showing 1 changed file with 81 additions and 71 deletions.
152 changes: 81 additions & 71 deletions limestone.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ exports.SphinxClient = function() {
"FLOAT": 5,
"BIGINT": 6,
"STRING": 7,
"MULTI": 0x40000000
"MULTI": 0x40000000
};

self.Sphinx = Sphinx;
Expand Down Expand Up @@ -128,22 +128,26 @@ exports.SphinxClient = function() {


server_conn = tcp.createConnection(port, host);
server_conn.on('error', function(x){
console.log('Error: '+x);
server_conn.end();
callback(x);
});
server_conn.on('error', function(x){
console.log('Error: '+x);
server_conn.end();
callback(x);
});

server_conn.on("close", function(x){
if (x) {
console.log('closed');
callback(new Error("The socket has closed due to an unknown error"));
}
});

// disable Nagle algorithm
server_conn.setNoDelay(true);
//server_conn.setEncoding('binary');

response_output = null;

//var promise = new process.Promise();

server_conn.addListener('error', function(e) {
callback(e);
});

server_conn.addListener('connect', function () {

Expand All @@ -152,8 +156,8 @@ exports.SphinxClient = function() {
// console.log('Sending version number...');
// Here we must send 4 bytes, '0x00000001'
if (server_conn.readyState == 'open') {
var version_number = Buffer.makeWriter();
version_number.push.int32(1);
var version_number = Buffer.makeWriter();
version_number.push.int32(1);
// Waiting for answer
server_conn.once('data', function(data) {
/*if (response_output) {
Expand Down Expand Up @@ -181,27 +185,32 @@ exports.SphinxClient = function() {

if (data_unpacked[""] >= 1) {

if (persistent){
server_conn.once('drain', function(){
var pers_req = Buffer.makeWriter();
pers_req.push.int16(Sphinx.command.PERSIST);
pers_req.push.int16(0);
pers_req.push.int32(4);
pers_req.push.int32(1);
server_conn.write(pers_req.toBuffer());
server_conn.once('drain', function(){
server_conn.on('data', readResponseData);
_connected = true;
server_conn.emit('sphinx.connected');
callback(null);
});
});
} else {
server_conn.once('drain', function(){
server_conn.on('data', readResponseData);
_connected = true;
server_conn.emit('sphinx.connected');
callback(null);
});
}

//all ok, send my version
server_conn.write(version_number.toBuffer());

if(persistent){
var pers_req = Buffer.makeWriter();
pers_req.push.int16(Sphinx.command.PERSIST);
pers_req.push.int16(0);
pers_req.push.int32(4);
pers_req.push.int32(1);
server_conn.write(pers_req.toBuffer());
}

server_conn.on('data', readResponseData);
_connected = true;
server_conn.emit('sphinx.connected');

// Use callback
// promise.emitSuccess();
callback(null);

server_conn.write(version_number.toBuffer());
} else {
callback(new Error('Wrong protocol version: ' + protocol_version));
server_conn.end();
Expand Down Expand Up @@ -265,7 +274,7 @@ exports.SphinxClient = function() {
if (query_raw.query) {
for (x in query_parameters) {
if (query_raw.hasOwnProperty(x)) {
query[x] = query_raw[x];
query[x] = query_raw[x];
} else {
query[x] = query_parameters[x];
}
Expand All @@ -274,7 +283,6 @@ exports.SphinxClient = function() {
query = query_raw.toString();
}


var request = Buffer.makeWriter();
request.push.int16(Sphinx.command.SEARCH);
request.push.int16(Sphinx.clientCommand.SEARCH);
Expand All @@ -292,9 +300,9 @@ exports.SphinxClient = function() {

request.push.int32(query.sort);

request.push.lstring(query.sortby);
request.push.lstring(query.sortby);
request.push.lstring(query.query); // Query text
request.push.int32(query.weights.length);
request.push.int32(query.weights.length);
for (var weight in query.weights) {
request.push.int32(parseInt(weight));
}
Expand All @@ -306,10 +314,10 @@ exports.SphinxClient = function() {
//request.push.int32(0);
request.push.int64(0, query.min_id); // This is actually supposed to be two 64-bit numbers
//request.push.int32(0); // However, there is a caveat about using 64-bit ids
request.push.int64(0, query.max_id);
request.push.int64(0, query.max_id);

//console.log('Found ' + query.filters.length + ' filters');
request.push.int32(query.filters.length);
request.push.int32(query.filters.length);
for (var filter_id in query.filters) {
var filter = query.filters[filter_id];
//console.log('Found filter of type ' + filter.type)
Expand Down Expand Up @@ -343,7 +351,7 @@ exports.SphinxClient = function() {
}
request.push.int32(filter.exclude);
}

request.push.int32(query_parameters.groupfunc);
request.push.lstring(query_parameters.groupby); // Groupby length

Expand All @@ -355,7 +363,7 @@ exports.SphinxClient = function() {
request.push.int32(query_parameters.retrycount); // Retrycount
request.push.int32(query_parameters.retrydelay); // Retrydelay

request.push.lstring(query_parameters.groupdistinct); // Group distinct
request.push.lstring(query_parameters.groupdistinct); // Group distinct

if (query_parameters.anchor.length == 0) {
request.push.int32(0); // no anchor given
Expand Down Expand Up @@ -411,7 +419,7 @@ exports.SphinxClient = function() {
req_length.push.int32(request_buf.length - 8);
req_length.toBuffer().copy(request_buf, 4, 0);

//console.log('Sending search request of ' + request_buf.length + ' bytes');
//console.log('Sending search request of ' + request_buf.length + ' bytes ');
_enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH);

};
Expand Down Expand Up @@ -501,7 +509,6 @@ exports.SphinxClient = function() {
req_length.toBuffer().copy(request_buf,4,0);

//console.log('Sending build excerpt request of ' + request_buf.length + 'bytes');

_enqueue(request_buf, callback, Sphinx.clientCommand.EXCERPT);
}; // build_excerpts

Expand All @@ -510,45 +517,48 @@ exports.SphinxClient = function() {
};

function _enqueue(req_buf , cb, sc) {
if(!server_conn || !server_conn.writable){
cb(new Error("Trying to enqueue. Not connected"));
}
_queue.push({request_buffer: req_buf, callback: cb, search_command: sc});
if(_queue.length === 1)
{
if(_connected) {
initResponseOutput(cb);
server_conn.write(req_buf);
} else {
server_conn.once('sphinx.connected', function(){
initResponseOutput(cb);
server_conn.write(req_buf);
});
if(!server_conn || !server_conn.writable){
cb(new Error("Trying to enqueue. Not connected"));
return;
}
_queue.push({request_buffer: req_buf, callback: cb, search_command: sc});
if (_queue.length === 1) {
if (_connected) {
initResponseOutput(cb);
server_conn.write(req_buf);
} else {
server_conn.once('sphinx.connected', function(){
initResponseOutput(cb);
server_conn.write(req_buf);
});
}
}
}
}

function _dequeue() {
_queue.shift();
if(!_queue.length){
return;
}
if(!_persistent){
server_conn = null;
return;
}
if(!server_conn){
throw new Error("Trying to dequeue. Not connected");
}
// we run the next server request in line
initResponseOutput(_queue[0]['callback']);
server_conn.write(_queue[0]['request_buffer']);
_queue.shift();
if (!_queue.length) {
return;
}

if(!_persistent){
server_conn = null;
return;
}

if(!server_conn){
throw new Error("Trying to dequeue. Not connected");
}

// we run the next server request in line
initResponseOutput(_queue[0]['callback']);
server_conn.write(_queue[0]['request_buffer']);
}

function readResponseData(data) {
// Got response!
response_output.append(data);
response_output.runCallbackIfDone(_queue[0]['search_command']);
response_output.runCallbackIfDone(_queue[0]['search_command']);
}

function initResponseOutput(query_callback) {
Expand Down

0 comments on commit a29d22b

Please sign in to comment.