Skip to content

Commit

Permalink
[CONJS-21] adding geometry implementation for bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Nov 13, 2018
1 parent da4763e commit 5d2b62f
Show file tree
Hide file tree
Showing 9 changed files with 886 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<p align="center">
<a href="http://mariadb.com/">
<img src="https://mariadb.com/themes/custom/mariadb/logo.svg">
<img src="https://mariadb.com/kb/static/images/logo-2018-black.png">
</a>
</p>

Expand Down
7 changes: 4 additions & 3 deletions lib/cmd/batch-bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class BatchBulk extends CommonBinary {
constructor(resolve, reject, options, connOpts, sql, values) {
super(resolve, reject, options, connOpts, sql, values);
this.sendEnded = false;
this.onPacketReceive = this.readPrepareResultPacket;
}

/**
Expand Down Expand Up @@ -59,14 +60,14 @@ class BatchBulk extends CommonBinary {

//send COM_STMT_PREPARE command
this.out = out;
this.packet = new BulkPacket(this.opts, out, this.values[0]);

out.startPacket(this);
out.writeInt8(0x16);
out.writeString(questionMarkSql);
out.flushBuffer(true);
this.onPacketReceive = this.readPrepareResultPacket;

out.startPacket(this);
this.packet = new BulkPacket(this.opts, out, this.values[0]);

this.valueIdx = 0;
while (this.valueIdx < this.values.length) {
Expand Down Expand Up @@ -189,7 +190,6 @@ class BatchBulk extends CommonBinary {

if (this.sendEnded && this.packet.waitingResponseNo === 0) {
this.packet = null;
this.onPacketReceive = null;
this.resolve = null;

//send COM_STMT_CLOSE packet
Expand All @@ -204,6 +204,7 @@ class BatchBulk extends CommonBinary {
this.emit("send_end");
process.nextTick(this.reject, this.firstError);
this.reject = null;
this.onPacketReceive = null;
this.emit("end", this.firstError);
return;
} else {
Expand Down
248 changes: 246 additions & 2 deletions lib/cmd/common-binary-cmd.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ class CommonBinary extends ResultSet {
* @param info connection information
*/
writeParam(out, value, opts, info) {
out.writeInt8(0x00);
switch (typeof value) {
case "boolean":
out.writeInt8(0x00);
out.writeInt8(value ? 0x01 : 0x00);
break;
case "number":
out.writeInt8(0x00);
out.writeLengthStringAscii("" + value);
break;
case "object":
if (Object.prototype.toString.call(value) === "[object Date]") {
out.writeInt8(0x00);
out.writeBinaryDate(value, opts);
} else if (Buffer.isBuffer(value)) {
out.writeInt8(0x00);
out.writeLengthEncodedBuffer(value);
} else if (typeof value.toSqlString === "function") {
out.writeInt8(0x00);
out.writeStringEscapeQuote(String(value.toSqlString()));
} else {
if (
Expand All @@ -47,18 +51,258 @@ class CommonBinary extends ResultSet {
"GeometryCollection"
].includes(value.type)
) {
//TODO implement geometry binary format.
const geoBuff = this.getBufferFromGeometryValue(value);
if (geoBuff) {
out.writeInt8(0x00);
out.writeLengthEncodedBuffer(Buffer.concat([Buffer.from([0, 0, 0, 0]), geoBuff]));
} else {
out.writeInt8(0x01); //NULL
}
} else {
//TODO check if permitSetMultiParamEntries is needed !?
out.writeInt8(0x00);
out.writeLengthEncodedString(JSON.stringify(value));
}
}
break;
default:
out.writeInt8(0x00);
out.writeLengthEncodedString(value);
}
}

getBufferFromGeometryValue(value, headerType) {
let geoBuff;
let pos;
let type;
if (!headerType) {
switch (value.type) {
case "Point":
geoBuff = Buffer.allocUnsafe(21);
geoBuff.writeInt8(0x01); //LITTLE ENDIAN
geoBuff.writeInt32LE(1, 1); //wkbPoint
if (
value.coordinates &&
Array.isArray(value.coordinates) &&
value.coordinates.length >= 2 &&
!isNaN(value.coordinates[0]) &&
!isNaN(value.coordinates[1])
) {
geoBuff.writeDoubleLE(value.coordinates[0], 5); //X
geoBuff.writeDoubleLE(value.coordinates[1], 13); //Y
return geoBuff;
} else {
return null;
}

case "LineString":
if (value.coordinates && Array.isArray(value.coordinates)) {
const pointNumber = value.coordinates.length;
geoBuff = Buffer.allocUnsafe(9 + 16 * pointNumber);
geoBuff.writeInt8(0x01); //LITTLE ENDIAN
geoBuff.writeInt32LE(2, 1); //wkbLineString
geoBuff.writeInt32LE(pointNumber, 5);
for (let i = 0; i < pointNumber; i++) {
if (
value.coordinates[i] &&
Array.isArray(value.coordinates[i]) &&
value.coordinates[i].length >= 2 &&
!isNaN(value.coordinates[i][0]) &&
!isNaN(value.coordinates[i][1])
) {
geoBuff.writeDoubleLE(value.coordinates[i][0], 9 + 16 * i); //X
geoBuff.writeDoubleLE(value.coordinates[i][1], 17 + 16 * i); //Y
} else {
return null;
}
}
return geoBuff;
} else {
return null;
}

case "Polygon":
if (value.coordinates && Array.isArray(value.coordinates)) {
const numRings = value.coordinates.length;
let size = 0;
for (let i = 0; i < numRings; i++) {
size += 4 + 16 * value.coordinates[i].length;
}
geoBuff = Buffer.allocUnsafe(9 + size);
geoBuff.writeInt8(0x01); //LITTLE ENDIAN
geoBuff.writeInt32LE(3, 1); //wkbPolygon
geoBuff.writeInt32LE(numRings, 5);
pos = 9;
for (let i = 0; i < numRings; i++) {
const lineString = value.coordinates[i];
if (lineString && Array.isArray(lineString)) {
geoBuff.writeInt32LE(lineString.length, pos);
pos += 4;
for (let j = 0; j < lineString.length; j++) {
if (
lineString[j] &&
Array.isArray(lineString[j]) &&
lineString[j].length >= 2 &&
!isNaN(lineString[j][0]) &&
!isNaN(lineString[j][1])
) {
geoBuff.writeDoubleLE(lineString[j][0], pos); //X
geoBuff.writeDoubleLE(lineString[j][1], pos + 8); //Y
pos += 16;
} else {
return null;
}
}
}
}
return geoBuff;
} else {
return null;
}

case "MultiPoint":
type = "MultiPoint";
geoBuff = Buffer.allocUnsafe(9);
geoBuff.writeInt8(0x01); //LITTLE ENDIAN
geoBuff.writeInt32LE(4, 1); //wkbMultiPoint
break;

case "MultiLineString":
type = "MultiLineString";
geoBuff = Buffer.allocUnsafe(9);
geoBuff.writeInt8(0x01); //LITTLE ENDIAN
geoBuff.writeInt32LE(5, 1); //wkbMultiLineString
break;

case "MultiPolygon":
type = "MultiPolygon";
geoBuff = Buffer.allocUnsafe(9);
geoBuff.writeInt8(0x01); //LITTLE ENDIAN
geoBuff.writeInt32LE(6, 1); //wkbMultiPolygon
break;

case "GeometryCollection":
geoBuff = Buffer.allocUnsafe(9);
geoBuff.writeInt8(0x01); //LITTLE ENDIAN
geoBuff.writeInt32LE(7, 1); //wkbGeometryCollection

if (value.geometries && Array.isArray(value.geometries)) {
const coordinateLength = value.geometries.length;
const subArrays = [geoBuff];
for (let i = 0; i < coordinateLength; i++) {
const tmpBuf = this.getBufferFromGeometryValue(value.geometries[i]);
if (tmpBuf == null) break;
subArrays.push(tmpBuf);
}
geoBuff.writeInt32LE(subArrays.length - 1, 5);
return Buffer.concat(subArrays);
} else {
geoBuff.writeInt32LE(0, 5);
return geoBuff;
}
default:
return null;
}
if (value.coordinates && Array.isArray(value.coordinates)) {
const coordinateLength = value.coordinates.length;
const subArrays = [geoBuff];
for (let i = 0; i < coordinateLength; i++) {
const tmpBuf = this.getBufferFromGeometryValue(value.coordinates[i], type);
if (tmpBuf == null) break;
subArrays.push(tmpBuf);
}
geoBuff.writeInt32LE(subArrays.length - 1, 5);
return Buffer.concat(subArrays);
} else {
geoBuff.writeInt32LE(0, 5);
return geoBuff;
}
} else {
switch (headerType) {
case "MultiPoint":
if (
value &&
Array.isArray(value) &&
value.length >= 2 &&
!isNaN(value[0]) &&
!isNaN(value[1])
) {
geoBuff = Buffer.allocUnsafe(21);
geoBuff.writeInt8(0x01); //LITTLE ENDIAN
geoBuff.writeInt32LE(1, 1); //wkbPoint
geoBuff.writeDoubleLE(value[0], 5); //X
geoBuff.writeDoubleLE(value[1], 13); //Y
return geoBuff;
}
return null;

case "MultiLineString":
if (value && Array.isArray(value)) {
const pointNumber = value.length;
geoBuff = Buffer.allocUnsafe(9 + 16 * pointNumber);
geoBuff.writeInt8(0x01); //LITTLE ENDIAN
geoBuff.writeInt32LE(2, 1); //wkbLineString
geoBuff.writeInt32LE(pointNumber, 5);
for (let i = 0; i < pointNumber; i++) {
if (
value[i] &&
Array.isArray(value[i]) &&
value[i].length >= 2 &&
!isNaN(value[i][0]) &&
!isNaN(value[i][1])
) {
geoBuff.writeDoubleLE(value[i][0], 9 + 16 * i); //X
geoBuff.writeDoubleLE(value[i][1], 17 + 16 * i); //Y
} else {
return null;
}
}
return geoBuff;
}
return null;

case "MultiPolygon":
if (value && Array.isArray(value)) {
const numRings = value.length;
let size = 0;
for (let i = 0; i < numRings; i++) {
size += 4 + 16 * value[i].length;
}
geoBuff = Buffer.allocUnsafe(9 + size);
geoBuff.writeInt8(0x01); //LITTLE ENDIAN
geoBuff.writeInt32LE(3, 1); //wkbPolygon
geoBuff.writeInt32LE(numRings, 5);
pos = 9;
for (let i = 0; i < numRings; i++) {
const lineString = value[i];
if (lineString && Array.isArray(lineString)) {
geoBuff.writeInt32LE(lineString.length, pos);
pos += 4;
for (let j = 0; j < lineString.length; j++) {
if (
lineString[j] &&
Array.isArray(lineString[j]) &&
lineString[j].length >= 2 &&
!isNaN(lineString[j][0]) &&
!isNaN(lineString[j][1])
) {
geoBuff.writeDoubleLE(lineString[j][0], pos); //X
geoBuff.writeDoubleLE(lineString[j][1], pos + 8); //Y
pos += 16;
} else {
return null;
}
}
}
}
return geoBuff;
}
return null;
}
return null;
}
}

/**
* Read text result-set row
*
Expand Down
9 changes: 7 additions & 2 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -984,8 +984,13 @@ function Connection(options) {
setImmediate(_nextSendCmd);
});

_receiveQueue.push(cmd);
cmd.start(_out, opts, info);
if (_sendQueue.isEmpty()) {
_receiveQueue.push(cmd);
cmd.start(_out, opts, info);
} else {
_receiveQueue.push(cmd);
_sendQueue.push(cmd);
}
return cmd;
};

Expand Down
26 changes: 26 additions & 0 deletions lib/io/bulk-packet.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ class BulkPacket {
if (this.datatypes[r] !== 0x0c) return true;
} else if (Buffer.isBuffer(row[r])) {
if (this.datatypes[r] !== 0xfb) return true;
} else if (
row[r].type != null &&
[
"Point",
"LineString",
"Polygon",
"MultiPoint",
"MultiLineString",
"MultiPolygon",
"GeometryCollection"
].includes(row[r].type)
) {
if (this.datatypes[r] !== 0xff) return true;
} else {
if (this.datatypes[r] !== 0x0f) return true;
}
Expand Down Expand Up @@ -97,6 +110,19 @@ class BulkPacket {
this.buf[this.pos++] = 0x0c;
} else if (Buffer.isBuffer(row[r])) {
this.buf[this.pos++] = 0xfb;
} else if (
row[r].type != null &&
[
"Point",
"LineString",
"Polygon",
"MultiPoint",
"MultiLineString",
"MultiPolygon",
"GeometryCollection"
].includes(row[r].type)
) {
this.buf[this.pos++] = 0xff;
} else {
this.buf[this.pos++] = 0x0f;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/io/packet.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Packet {
}
readGeometry() {
const geoBuf = this.readBufferLengthEncoded();
if (geoBuf === null) {
if (geoBuf === null || geoBuf.length === 0) {
return null;
}
let geoPos = 4;
Expand Down
Loading

0 comments on commit 5d2b62f

Please sign in to comment.