Permalink
Browse files

Experiment interface for sending row data

  • Loading branch information...
1 parent bc8d7a5 commit 9fbf2d142cf8cdae64b57be4f06269ab95fb210c @felixge felixge committed Jul 13, 2012
@@ -3,6 +3,7 @@ var PacketWriter = require('./protocol/PacketWriter');
var Parser = require('./protocol/Parser');
var Auth = require('./protocol/Auth');
var EventEmitter = require('events').EventEmitter;
+var types = require('./protocol/constants/types');
var commands = require('./protocol/constants/commands');
var errors = require('./protocol/constants/errors');
var util = require('util');
@@ -13,7 +14,7 @@ function ServerConnection(options) {
EventEmitter.call(this);
this._socket = null;
- this._scramleBuff = null;
+ this._scramleBuff = null;
this._clientAuthenticationPacket = null;
this._parser = new Parser({onPacket: this._handlePacket.bind(this)});
@@ -124,6 +125,82 @@ ServerConnection.prototype.error = function(options) {
};
+ServerConnection.prototype.results = function(results) {
+ var fields = this.buildFields(results);
+
+ this.resultHeader({fieldCount: fields.length});
+ this.fields(fields);
+ this.eof();
+ results.forEach(this.result.bind(this));
+ this.eof({last: true});
+};
+
+ServerConnection.prototype.resultHeader = function(options) {
+ options = options || {};
+
+ var packet = new Packets.ResultSetHeaderPacket({
+ fieldCount : options.fieldCount || 0,
+ extra : options.extra,
+ });
+
+ this._send(packet);
+};
+
+ServerConnection.prototype.buildFields = function(results) {
+ results = [].concat(results);
+
+ var firstResult = results[0];
+ var fields = [];
+
+ for (var field in firstResult) {
+ var value = firstResult[field];
+
+ fields.push({
+ name : field,
+ type : 'STRING',
+ });
+ }
+
+ return fields;
+};
+
+ServerConnection.prototype.fields = function(fields) {
+ fields.forEach(this.field.bind(this));
+};
+
+ServerConnection.prototype.field = function(options) {
+ options = options || {};
+
+ options.catalog = options.catalog || 'def';
+
+ var packet = new Packets.FieldPacket(options);
+
+ this._send(packet);
+};
+
+ServerConnection.prototype.result = function(result) {
+ var writer = new PacketWriter();
+
+ for (var field in result) {
+ var value = result[field];
+ writer.writeLengthCodedString(value);
+ }
+
+ this._socket.write(writer.toBuffer(this._parser));
+};
+
+ServerConnection.prototype.eof = function(options, last) {
+ options = options || {};
+
+ var packet = new Packets.EofPacket(options);
+
+ this._send(packet);
+
+ if (options.last) {
+ this._parser.resetPacketNumber();
+ }
+};
+
ServerConnection.prototype._handlePacket = function(header) {
var Packet = this._determinePacket(header);
var packet = new Packet();
@@ -138,6 +215,10 @@ ServerConnection.prototype['ClientAuthenticationPacket'] = function(packet) {
this.emit('auth', packet);
};
+ServerConnection.prototype['ComQueryPacket'] = function(packet) {
+ this.emit('query', packet);
+};
+
ServerConnection.prototype['ComQuitPacket'] = function(packet) {
this.end();
};
@@ -155,9 +236,11 @@ ServerConnection.prototype._determinePacket = function(header) {
switch (byte) {
case commands.COM_QUIT:
return Packets.ComQuitPacket;
+ case commands.COM_QUERY:
+ return Packets.ComQueryPacket;
default:
// @TODO, emit this
- throw new Error('Unknown packet: ' + byte);
+ throw new Error('Unsupported packet: ' + byte);
break;
}
};
@@ -75,9 +75,9 @@ Protocol.prototype._enqueue = function(sequence) {
if (!this._validateEnqueue(sequence)) {
return sequence;
}
-
+
this._queue.push(sequence);
-
+
var self = this;
sequence
.on('error', function(err) {
@@ -1,6 +1,8 @@
+var commands = require('../constants/commands');
+
module.exports = ComQueryPacket;
function ComQueryPacket(sql) {
- this.command = 0x03;
+ this.command = commands.COM_QUERY;
this.sql = sql;
}
@@ -2,7 +2,7 @@ module.exports = EofPacket;
function EofPacket(options) {
options = options || {};
- this.fieldCount = undefined;
+ this.fieldCount = 0xfe;
this.warningCount = options.warningCount;
this.serverStatus = options.serverStatus;
}
@@ -14,7 +14,7 @@ EofPacket.prototype.parse = function(parser) {
};
EofPacket.prototype.write = function(writer) {
- writer.writeUnsignedNumber(1, 0xfe);
+ writer.writeUnsignedNumber(1, this.fieldCount);
writer.writeUnsignedNumber(2, this.warningCount);
writer.writeUnsignedNumber(2, this.serverStatus);
};
@@ -0,0 +1,41 @@
+var common = require('../../common');
+var assert = require('assert');
+var mysql = common.mysql;
+
+var server = mysql.createServer(function(connection) {
+ connection.greet();
+
+ connection
+ .on('auth', function(auth) {
+ connection.accept();
+ })
+ .on('query', function(query) {
+ assert.equal(query.sql, 'SELECT 1');
+
+ // Experimental interface
+ connection.results([{
+ '1': 1,
+ }]);
+ });
+});
+
+
+var results;
+server.listen(common.serverPort, function() {
+ var connection = common.createConnection({port: common.serverPort});
+
+ connection.query('SELECT 1', function(err, _results) {
+ if (err) throw err;
+
+ results = _results;
+ });
+
+ connection.end(function() {
+ server.close();
+ });
+});
+
+process.on('exit', function() {
+ assert.equal(results.length, 1);
+ assert.equal(results[0]['1'], 1);
+});

0 comments on commit 9fbf2d1

Please sign in to comment.