Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Make the initial connection process "drain aware" #19

Merged
merged 5 commits into from

2 participants

@JoeZ99

In the process of

  • send version number
  • send persistent flag (if applicable)
  • emit 'sphinx.connected' event so querys can be sent

I've added "drain" event support, so limestone won't write to the sphinx server before the previous written thing is complete.

send version number ---> wait for drain --> send persistent flat ---> wait for drain --> _connected=true

but, This only works on node 0.6. that's why I've put it apart from the master. I personally thing is not big deal, limestone is still in its early stage, but I leave it to you.

As usual, any comments are welcome.

@kurokikaze kurokikaze referenced this pull request from a commit
@kurokikaze Merged pull request #19~ a29d22b
@kurokikaze kurokikaze merged commit fa0acab into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 2, 2011
  1. @JoeZ99

    when sending protocol version and "persistent" flag, wait for the dr…

    JoeZ99 authored
    …ain events to avoid collision. Only works in node 0.6
Commits on Jan 11, 2012
  1. @JoeZ99

    Merge branch 'remote_host_support' into drain_event_support_node6

    JoeZ99 authored
    Conflicts:
    	limestone.js
Commits on Jan 12, 2012
  1. @JoeZ99
Commits on Jan 16, 2012
  1. @JoeZ99

    code formatting

    JoeZ99 authored
  2. @JoeZ99

    removing spureous logs

    JoeZ99 authored
This page is out of date. Refresh to see the latest.
Showing with 252 additions and 199 deletions.
  1. +252 −199 limestone.js
View
451 limestone.js
@@ -6,98 +6,100 @@ exports.SphinxClient = function() {
var buffer_extras = require('./buffer_extras');
var Sphinx = {
- port : 9312
+ port : 9312
};
// All search modes
Sphinx.searchMode = {
- "ALL":0,
- "ANY":1,
- "PHRASE":2,
- "BOOLEAN":3,
- "EXTENDED":4,
- "FULLSCAN":5,
- "EXTENDED2":6 // extended engine V2 (TEMPORARY, WILL BE REMOVED)
+ "ALL":0,
+ "ANY":1,
+ "PHRASE":2,
+ "BOOLEAN":3,
+ "EXTENDED":4,
+ "FULLSCAN":5,
+ "EXTENDED2":6 // extended engine V2 (TEMPORARY, WILL BE REMOVED)
};
// All ranking modes
Sphinx.rankingMode = {
- "PROXIMITY_BM25" : 0, ///< default mode, phrase proximity major factor and BM25 minor one
- "BM25" : 1, ///< statistical mode, BM25 ranking only (faster but worse quality)
- "NONE" : 2, ///< no ranking, all matches get a weight of 1
- "WORDCOUNT" : 3, ///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts
- "PROXIMITY" : 4,
- "MATCHANY" : 5,
- "FIELDMASK" : 6,
- "SPH04" : 7,
- "TOTAL" : 8
+ "PROXIMITY_BM25" : 0, ///< default mode, phrase proximity major factor and BM25 minor one
+ "BM25" : 1, ///< statistical mode, BM25 ranking only (faster but worse quality)
+ "NONE" : 2, ///< no ranking, all matches get a weight of 1
+ "WORDCOUNT" : 3, ///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts
+ "PROXIMITY" : 4,
+ "MATCHANY" : 5,
+ "FIELDMASK" : 6,
+ "SPH04" : 7,
+ "TOTAL" : 8
};
Sphinx.sortMode = {
- "RELEVANCE" : 0,
- "ATTR_DESC" : 1,
- "ATTR_ASC" : 2,
- "TIME_SEGMENTS" : 3,
- "EXTENDED" : 4,
- "EXPR" : 5
+ "RELEVANCE" : 0,
+ "ATTR_DESC" : 1,
+ "ATTR_ASC" : 2,
+ "TIME_SEGMENTS" : 3,
+ "EXTENDED" : 4,
+ "EXPR" : 5
};
Sphinx.groupFunc = {
- "DAY" : 0,
- "WEEK" : 1,
- "MONTH" : 2,
- "YEAR" : 3,
- "ATTR" : 4,
- "ATTRPAIR" : 5
+ "DAY" : 0,
+ "WEEK" : 1,
+ "MONTH" : 2,
+ "YEAR" : 3,
+ "ATTR" : 4,
+ "ATTRPAIR" : 5
};
// Commands
Sphinx.command = {
- "SEARCH" : 0,
- "EXCERPT" : 1,
- "UPDATE" : 2,
- "KEYWORDS" : 3,
- "PERSIST" : 4,
- "STATUS" : 5,
- "QUERY" : 6,
- "FLUSHATTRS" : 7
+ "SEARCH" : 0,
+ "EXCERPT" : 1,
+ "UPDATE" : 2,
+ "KEYWORDS" : 3,
+ "PERSIST" : 4,
+ "STATUS" : 5,
+ "QUERY" : 6,
+ "FLUSHATTRS" : 7
};
// Current version client commands
Sphinx.clientCommand = {
- "SEARCH" : 0x118,
- "EXCERPT" : 0x103,
- "UPDATE" : 0x102,
- "KEYWORDS" : 0x100,
- "STATUS" : 0x100,
- "QUERY" : 0x100,
- "FLUSHATTRS": 0x100
+ "SEARCH" : 0x118,
+ "EXCERPT" : 0x103,
+ "UPDATE" : 0x102,
+ "KEYWORDS" : 0x100,
+ "STATUS" : 0x100,
+ "QUERY" : 0x100,
+ "FLUSHATTRS": 0x100
};
Sphinx.statusCode = {
- "OK": 0,
- "ERROR": 1,
- "RETRY": 2,
- "WARNING": 3
+ "OK": 0,
+ "ERROR": 1,
+ "RETRY": 2,
+ "WARNING": 3
};
Sphinx.filterTypes = {
- "VALUES" : 0,
- "RANGE" : 1,
- "FLOATRANGE" : 2
+ "VALUES" : 0,
+ "RANGE" : 1,
+ "FLOATRANGE" : 2
};
Sphinx.attribute = {
- "INTEGER": 1,
- "TIMESTAMP": 2,
- "ORDINAL": 3,
- "BOOL": 4,
- "FLOAT": 5,
- "BIGINT": 6,
- "STRING": 7,
- "MULTI": 0x40000000
+ "INTEGER": 1,
+ "TIMESTAMP": 2,
+ "ORDINAL": 3,
+ "BOOL": 4,
+ "FLOAT": 5,
+ "BIGINT": 6,
+ "STRING": 7,
+ "MULTI": 0x40000000
};
+ self.Sphinx = Sphinx;
+
var server_conn = null;
var response_output;
var _connected = false;
@@ -105,14 +107,17 @@ exports.SphinxClient = function() {
var _persistent = false;
-
// Connect to Sphinx server
self.connect = function() {
// arguments: ([host:port], [persistent], callback).
var args = Array.prototype.slice.call(arguments);
+
var callback = args.pop();
var hostport = args.length ? args.shift() + '' : ':'+Sphinx.port;
+ var persistent = _persistent = args.length ? args.shift() : false;
+
+
if(hostport.indexOf(':')==-1){
hostport = isNaN(hostport) ? hostport + ':' + Sphinx.port : ':' + hostport;
}
@@ -121,82 +126,113 @@ exports.SphinxClient = function() {
var host = hostport[0].trim().length ? hostport[0].trim(): 'localhost' ;
var port = hostport[1].trim().length ? hostport[1].trim() : Sphinx.port;
- var persistent = _persistent = args.length ? args.shift() : false;
- server_conn = tcp.createConnection(port, host);
+ server_conn = tcp.createConnection(port, host);
server_conn.on('error', function(x){
console.log('Error: '+x);
server_conn.end();
callback(x);
});
- // disable Nagle algorithm
- server_conn.setNoDelay(true);
-
- server_conn.addListener('connect',
- function () {
- // Sending protocol version
- // Here we must send 4 bytes, '0x00000001'
- if (server_conn.readyState == 'open') {
- var version_number = Buffer.makeWriter();
- version_number.push.int32(1);
- // Waiting for answer
- server_conn.once('data', function(data) {
- var protocol_version_raw = data.toReader();
- var protocol_version = protocol_version_raw.int32();
- // if there still data? process and callback
- if(!protocol_version_raw.empty()) {
- status_code = protocol_version_raw.int16();
- version = protocol_version_raw.int16();
- server_message = protocol_version_raw.lstring();
- if(status_code == Sphinx.statusCode.ERROR){
- errmsg = 'Server issued ERROR: '+server_message;
- }
- if(status_code == Sphinx.statusCode.RETRY){
- errmsg = 'Server issued RETRY: '+server_message;
- }
- if(errmsg){
- callback(new Error(errmsg));
- }
- }
-
- var data_unpacked = {'': protocol_version};
- if (data_unpacked[""] >= 1) {
- //all ok, send my version
- server_conn.write(version_number.toBuffer());
-
- if(persistent){
- var pers_req = Buffer.makeWriter();
- pers_req.push.int16(Sphinx.command.PERSIST);
- pers_req.push.int16(0);
- pers_req.push.int32(4);
- pers_req.push.int32(1);
- server_conn.write(pers_req.toBuffer());
- }
- server_conn.on('data', readResponseData);
- _connected = true;
- server_conn.emit('sphinx.connected');
-
- // Use callback
- callback(null);
-
- } else {
- callback(new Error('Wrong protocol version: ' + protocol_version));
- server_conn.end();
- }
-
- });
- } else {
- callback(new Error('Connection is ' + server_conn.readyState + ' in OnConnect'));
- server_conn.end();
- _connected = false;
- }
- });
+
+ server_conn.on("close", function(x){
+ if(x){
+ console.log('closed');
+ callback(new Error("The socket has closed due to an unknown error"));
+ }
+ });
+ // disable Nagle algorithm
+ server_conn.setNoDelay(true);
+ //server_conn.setEncoding('binary');
+
+ response_output = null;
+
+ //var promise = new process.Promise();
+
+ server_conn.addListener('connect', function () {
+
+ // console.log('Connected, sending protocol version... State is ' + server_conn.readyState);
+ // Sending protocol version
+ // console.log('Sending version number...');
+ // Here we must send 4 bytes, '0x00000001'
+ if (server_conn.readyState == 'open') {
+ var version_number = Buffer.makeWriter();
+ version_number.push.int32(1);
+ // Waiting for answer
+ server_conn.once('data', function(data) {
+ /*if (response_output) {
+ console.log('connect: Data received from server');
+ }*/
+
+ var protocol_version_raw = data.toReader();
+ var protocol_version = protocol_version_raw.int32();
+ // if there still data? process and callback
+ if(!protocol_version_raw.empty()) {
+ status_code = protocol_version_raw.int16();
+ version = protocol_version_raw.int16();
+ server_message = protocol_version_raw.lstring();
+ if(status_code == Sphinx.statusCode.ERROR){
+ errmsg = 'Server issued ERROR: '+server_message;
+ }
+ if(status_code == Sphinx.statusCode.RETRY){
+ errmsg = 'Server issued RETRY: '+server_message;
+ }
+ if(errmsg){
+ callback(new Error(errmsg));
+ }
+ }// if !protocol_version_raw.empty()
+ var data_unpacked = {'': protocol_version};
+
+ if (data_unpacked[""] >= 1) {
+
+ if(persistent){
+ server_conn.once('drain', function(){
+ var pers_req = Buffer.makeWriter();
+ pers_req.push.int16(Sphinx.command.PERSIST);
+ pers_req.push.int16(0);
+ pers_req.push.int32(4);
+ pers_req.push.int32(1);
+ server_conn.write(pers_req.toBuffer());
+ server_conn.once('drain', function(){
+ server_conn.on('data', readResponseData);
+ _connected = true;
+ server_conn.emit('sphinx.connected');
+ callback(null);
+ });
+ });
+ } else {
+ server_conn.once('drain', function(){
+ server_conn.on('data', readResponseData);
+ _connected = true;
+ server_conn.emit('sphinx.connected');
+ callback(null);
+ });
+ }
+
+ //all ok, send my version
+ server_conn.write(version_number.toBuffer());
+ } else {
+ callback(new Error('Wrong protocol version: ' + protocol_version));
+ server_conn.end();
+ }
+
+ });
+ } else {
+ callback(new Error('Connection is ' + server_conn.readyState + ' in OnConnect'));
+ server_conn.end();
+ _connected = false;
+ }
+ });
+
};
+ // console.log('Connecting to searchd...');
+
self.query = function(query_raw, callback) {
- var query = new Object();
+ var query = new Object();
- var query_parameters = {
+
+ // Default query parameters
+ var query_parameters = {
offset : 0,
limit : 20,
mode : Sphinx.searchMode.ALL,
@@ -221,9 +257,9 @@ exports.SphinxClient = function() {
fieldweights : {},
overrides : [],
selectlist : "*",
- indexes : '*',
- comment : '',
- query : "",
+ indexes : '*',
+ comment : '',
+ query : "",
error : "", // per-reply fields (for single-query case)
warning : "",
connerror : false,
@@ -232,27 +268,28 @@ exports.SphinxClient = function() {
mbenc : "",
arrayresult : true,
timeout : 0
- };
+ };
+
+ if (query_raw.query) {
+ for (x in query_parameters) {
+ if (query_raw.hasOwnProperty(x)) {
+ query[x] = query_raw[x];
+ } else {
+ query[x] = query_parameters[x];
+ }
+ }
+ } else {
+ query = query_raw.toString();
+ }
- if (query_raw.query) {
- for (x in query_parameters) {
- if (query_raw.hasOwnProperty(x)) {
- query[x] = query_raw[x];
- } else {
- query[x] = query_parameters[x];
- }
- }
- } else {
- query = query_raw.toString();
- }
var request = Buffer.makeWriter();
- request.push.int16(Sphinx.command.SEARCH);
+ request.push.int16(Sphinx.command.SEARCH);
request.push.int16(Sphinx.clientCommand.SEARCH);
- request.push.int32(0); // This will be request length
- request.push.int32(0);
- request.push.int32(1);
+ request.push.int32(0); // This will be request length
+ request.push.int32(0);
+ request.push.int32(1);
request.push.int32(query.offset);
@@ -263,65 +300,80 @@ exports.SphinxClient = function() {
request.push.int32(query.sort);
- request.push.lstring(query.sortby);
- request.push.lstring(query.query); // Query text
- request.push.int32(query.weights.length);
- for (var weight in query.weights) {
- request.push.int32(parseInt(weight));
- }
-
- request.push.lstring(query.indexes); // Indexes
-
- request.push.int32(1); // id64 range marker
-
- request.push.int64(0, query.min_id); // This is actually supposed to be two 64-bit numbers
- request.push.int64(0, query.max_id);
+ request.push.lstring(query.sortby);
+ request.push.lstring(query.query); // Query text
+ request.push.int32(query.weights.length);
+ for (var weight in query.weights) {
+ request.push.int32(parseInt(weight));
+ }
- request.push.int32(query.filters.length);
- for (var filter in query.filters) {
- request.push.int32(filter.attr.length);
- request.push_lstring(filter.attr);
- request.push.int32(filter.type);
- switch (filter.type) {
- case Sphinx.filterTypes.VALUES:
- request.push.int32(filter.values.length);
- for (var value in filter.values) {
- request.push.int64(0, value);
- }
- break;
- case Sphinx.filterTypes.RANGE:
- request.push.int64(0, filter.min);
- request.push.int64(0, filter.max);
- break;
- case Sphinx.filterTypes.FLOATRANGE:
- request.push.float(filter.min);
- request.push.float(filter.max);
- break;
- }
- }
+ request.push.lstring(query.indexes); // Indexes used JEZ
+
+ request.push.int32(1); // id64 range marker
+
+ //request.push.int32(0);
+ request.push.int64(0, query.min_id); // This is actually supposed to be two 64-bit numbers
+ //request.push.int32(0); // However, there is a caveat about using 64-bit ids
+ request.push.int64(0, query.max_id);
+
+ //console.log('Found ' + query.filters.length + ' filters');
+ request.push.int32(query.filters.length);
+ for (var filter_id in query.filters) {
+ var filter = query.filters[filter_id];
+ //console.log('Found filter of type ' + filter.type)
+ if (!filter.attr) {
+ filter.attr = "";
+ }
+ if (!filter.exclude) {
+ filter.exclude = 0;
+ }
+ //request.push.int32(filter.attr.length);//WTF? length is included in lstring
+ request.push.lstring(filter.attr);
+ request.push.int32(filter.type);
+ switch (filter.type) {
+ case Sphinx.filterTypes.VALUES:
+ request.push.int32(filter.values.length); // Count of values
+ for (var value_id in filter.values) {
+ //request.push.int32(0); // should be a 64-bit number
+ request.push.int64(0, filter.values[value_id]);
+ }
+ break;
+ case Sphinx.filterTypes.RANGE:
+ //request.push.int32(0); // should be a 64-bit number
+ request.push.int64(0, filter.min);
+ //request.push.int32(0); // should be a 64-bit number
+ request.push.int64(0, filter.max);
+ break;
+ case Sphinx.filterTypes.FLOATRANGE:
+ request.push.float(filter.min);
+ request.push.float(filter.max);
+ break;
+ }
+ request.push.int32(filter.exclude);
+ }
- request.push.int32(query_parameters.groupfunc);
- request.push.lstring(query_parameters.groupby); // Groupby length
+ request.push.int32(query_parameters.groupfunc);
+ request.push.lstring(query_parameters.groupby); // Groupby length
- request.push.int32(query_parameters.maxmatches); // Maxmatches, default to 1000
+ request.push.int32(query_parameters.maxmatches); // Maxmatches, default to 1000
- request.push.lstring(query_parameters.groupsort); // Groupsort
+ request.push.lstring(query_parameters.groupsort); // Groupsort
- request.push.int32(query_parameters.cutoff); // Cutoff
- request.push.int32(query_parameters.retrycount); // Retrycount
- request.push.int32(query_parameters.retrydelay); // Retrydelay
+ request.push.int32(query_parameters.cutoff); // Cutoff
+ request.push.int32(query_parameters.retrycount); // Retrycount
+ request.push.int32(query_parameters.retrydelay); // Retrydelay
- request.push.lstring(query_parameters.groupdistinct); // Group distinct
+ request.push.lstring(query_parameters.groupdistinct); // Group distinct
- if (query_parameters.anchor.length == 0) {
- request.push.int32(0); // no anchor given
- } else {
- request.push.int32(1); // anchor point in radians
- request.push.lstring(query_parameters.anchor["attrlat"]); // Group distinct
- request.push.lstring(query_parameters.anchor["attrlong"]); // Group distinct
- request.push.float(query_parameters.anchor["lat"]);
- request.push.float(query_parameters.anchor["long"]);
- }
+ if (query_parameters.anchor.length == 0) {
+ request.push.int32(0); // no anchor given
+ } else {
+ request.push.int32(1); // anchor point in radians
+ request.push.lstring(query_parameters.anchor["attrlat"]); // Group distinct
+ request.push.lstring(query_parameters.anchor["attrlong"]); // Group distinct
+ request.push.float(query_parameters.anchor["lat"]);
+ request.push.float(query_parameters.anchor["long"]);
+ }
request.push.int32(query_parameters.indexweights.length);
for (var i in query_parameters.indexweights) {
@@ -367,7 +419,7 @@ exports.SphinxClient = function() {
req_length.push.int32(request_buf.length - 8);
req_length.toBuffer().copy(request_buf, 4, 0);
- console.log('Sending search request of ' + request_buf.length + ' bytes ');
+ //console.log('Sending search request of ' + request_buf.length + ' bytes ');
_enqueue(request_buf, callback, Sphinx.clientCommand.SEARCH);
};
@@ -455,7 +507,7 @@ exports.SphinxClient = function() {
req_length.push.int32(request_buf.length - 8);
req_length.toBuffer().copy(request_buf,4,0);
- console.log('Sending build excerpt request of ' + request_buf.length + 'bytes');
+ //console.log('Sending build excerpt request of ' + request_buf.length + 'bytes');
_enqueue(request_buf, callback, Sphinx.clientCommand.EXCERPT);
}; // build_excerpts
@@ -466,6 +518,7 @@ exports.SphinxClient = function() {
function _enqueue(req_buf , cb, sc) {
if(!server_conn || !server_conn.writable){
cb(new Error("Trying to enqueue. Not connected"));
+ return;
}
_queue.push({request_buffer: req_buf, callback: cb, search_command: sc});
if(_queue.length === 1)
Something went wrong with that request. Please try again.