Skip to content

Commit

Permalink
ZitiSocket enhancements: employ async iterator on Ziti stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Curt Tudor committed Feb 23, 2020
1 parent 8014a63 commit e005bb4
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 87 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ziti-electron-fetch",
"version": "2.6.5",
"version": "2.6.6",
"description": "A module that intercepts all window.fetch calls in an Electron renderer process and routes them over a Ziti network",
"main": "lib/index.js",
"files": [
Expand Down
147 changes: 61 additions & 86 deletions src/ziti-socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,46 @@ import {Duplex, Readable} from 'stream';
class ZitiSocket extends Duplex {

constructor() {
super({objectMode: true});
super();

/**
This is where we'll put any data returned from a Ziti connection
*/
this.readableStream = new Readable();

this.readableStream._read = function () {
};
* This stream is where we'll put any data returned from a Ziti connection (see NF_dial.data.call_back)
*/
this.readableZitiStream = new Readable();
this.readableZitiStream._read = function () {}

/**
The underlying Ziti Connection
@private
@type {string}
*/
this.zitiConnection; // eslint-disable-line no-unused-expressions
* The underlying Ziti Connection
* @private
* @type {string}
*/
this.zitiConnection;

/**
True when read buffer is full and calls to `push` return false.
Additionally data will not be read off the socket until the user
calls `read`.
@private
@type {boolean}
*/
this._readingPaused = false;
* Start the async iterator on the Ziti stream.
*/
setImmediate(this._pumpZitiStream.bind(this));
}


/**
* Pump all data arriving from Ziti connection out into the Duplex stream represented by this ZitiSocket object
*/
async _pumpZitiStream() {
// Block here waiting for a chunk of data
for await (const chunk of this.readableZitiStream) {
// Push the chunk into teh Duplex. If we experience back-pressure, wait for things to drain.
if (!this.push(chunk)) await new Promise((res, rej) => {
this.once("drain", res);
});
}
}


/**
* Make a connection to the specified Ziti 'service'. We do this by invoking the NF_dial() function in the Ziti NodeJS-SDK.
* @param {*} service
*/
NF_dial(service) {
const self = this;
return new Promise((resolve) => {
Expand All @@ -55,21 +68,29 @@ class ZitiSocket extends Duplex {
else {
window.ziti.NF_dial(
service,

/**
* on_connect callback.
*/
(conn) => {
resolve(conn);
},

/**
* on_data callback
*/
(data) => {
this.readableStream.push(data);
this.readableZitiStream.push(data);
},
);
}
});
}

/**
Pushes the data onto the underlying Ziti connection by invoking the NF_write() function in the native addon. The
native addon expects incoming data to be of type Buffer.
*/
/**
* Write data onto the underlying Ziti connection by invoking the NF_write() function in the Ziti NodeJS-SDK. The
* NodeJS-SDK expects incoming data to be of type Buffer.
*/
NF_write(conn, buffer) {
return new Promise((resolve) => {
window.ziti.NF_write(
Expand All @@ -82,71 +103,26 @@ class ZitiSocket extends Duplex {
}

/**
Connect to a Ziti service.
@param {object} param
@param {string} [param.host] the host to connect to. Default is localhost
@param {number} param.port the port to connect to. Required.
@return {ZitiSocket}
* Connect to a Ziti service.
* @param {object} param
* @param {string} [param.host] the host to connect to. Default is localhost
* @param {number} param.port the port to connect to. Required.
* @return {ZitiSocket}
*/
async connect(opts) {
this.zitiConnection = await this.NF_dial(opts.host).catch((e) => console.log('connect Error: ', e.message)); // eslint-disable-line new-cap
}

_hasData() {
const self = this;
return new Promise((resolve) => {
(function waitForData() {
if (self.readableStream.readableLength > 0) return resolve();
setTimeout(waitForData, 100);
})();
});
}


/**
Performs data read events which are triggered under two conditions:
1. underlying `readable` events emitted when there is new data
available on the socket
2. the consumer requested additional data
@private
*/
async _onReadable() {

await this._hasData().catch((e) => console.log('inside ziti-socket.js _onReadable(), Error: ', e.message));

// Read all the data until one of two conditions is met
// 1. there is nothing left to read on the socket
// 2. reading is paused because the consumer is slow
while (!this._readingPaused) {

const data = this.readableStream.read();
if (data === null) {
break;
}
else {
// Push the data into the read buffer and capture whether
// we are hitting the back pressure limits
let pushOk = this.push(data);
// When the push fails, we need to pause the ability to read
// messages because the consumer is getting backed up.
if (!pushOk) this._readingPaused = true;
}
}
}
*
*/
_read() { /* NOP */ }

/**
Implements the readable stream method `_read`. This method will
flagged that reading is no longer paused since this method should
only be called by a consumer reading data.
@private
*/
_read() {
this._readingPaused = false;
setImmediate(this._onReadable.bind(this));
}

/**
Returna a Promise that will resolve _only_ after a Ziti connection has been established for this instance of ZitiSocket.
*/
* Returna a Promise that will resolve _only_ after a Ziti connection has been established for this instance of ZitiSocket.
*/
getZitiConnection() {
const self = this;
return new Promise((resolve) => {
Expand All @@ -158,9 +134,9 @@ class ZitiSocket extends Duplex {
}

/**
Implements the writeable stream method `_write` by pushing the data onto the underlying Ziti connection.
It is possible that this function is called before the Ziti connect has completed, so this function will (currently)
await Ziti connection establishment (as opposed to buffering the data).
* Implements the writeable stream method `_write` by pushing the data onto the underlying Ziti connection.
* It is possible that this function is called before the Ziti connect has completed, so this function will (currently)
* await Ziti connection establishment (as opposed to buffering the data).
*/
async _write(chunk, encoding, cb) {

Expand All @@ -181,9 +157,8 @@ class ZitiSocket extends Duplex {
}

/**
Implements the writeable stream method `_final` used when
.end() is called to write the final data to the stream.
*/
* Implements the writeable stream method `_final` used when .end() is called to write the final data to the stream.
*/
_final(cb) {
cb();
}
Expand Down

0 comments on commit e005bb4

Please sign in to comment.