Skip to content

Commit

Permalink
Merge pull request #177 from matrix-org/hs/event-queue-finally
Browse files Browse the repository at this point in the history
Typescriptify event queue
  • Loading branch information
Half-Shot committed Jul 24, 2020
2 parents 2dbeacf + 1dc0f12 commit b334c87
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .eslintrcts.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"no-unreachable": 2,
"no-unexpected-multiline": 2,
"no-unused-expressions": 2,
"no-use-before-define": [1, "nofunc"],
"no-use-before-define": 0,
"use-isnan": 2,
"valid-typeof": 2,
"array-bracket-spacing": [1, "never"],
Expand Down
1 change: 1 addition & 0 deletions changelog.d/177.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Port `EventQueue` to Typescript.
85 changes: 40 additions & 45 deletions src/components/event-queue.js → src/components/event-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import Bluebird from "bluebird";

const Bluebird = require("bluebird");
type DataReady = Promise<object>;

// It's an event, which has no type yet.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type ConsumeCallback = (error: Error|null, event: any) => void;

/**
* Handles the processing order of incoming Matrix events.
Expand All @@ -24,7 +29,7 @@ const Bluebird = require("bluebird");
*
* Abstract Base Class. Use the factory method `create` to create new instances.
*/
class EventQueue {
export class EventQueue {
/**
* Private constructor.
*
Expand All @@ -33,15 +38,12 @@ class EventQueue {
* @param {consumeCallback} consumeFn Function which is called when an event
* is consumed.
*/
constructor(type, consumeFn) {
this.type = type;
this._queues = {
// $identifier: {
// events: [ {dataReady: } ],
// consuming: true|false
// }
};
this.consumeFn = consumeFn;
private queues: { [identifer: string]: {
events: Array<{ dataReady: DataReady }>;
consuming: boolean;
}; } = {};
constructor(private type: "none"|"single"|"per_room", protected consumeFn: ConsumeCallback) {

}

/**
Expand All @@ -50,48 +52,48 @@ class EventQueue {
* @param {IMatrixEvent} event The event to enqueue.
* @param {Promise<object>} dataReady Promise containing data related to the event.
*/
push(event, dataReady) {
const queue = this._getQueue(event);
public push(event: {room_id: string}, dataReady: DataReady) {
const queue = this.getQueue(event);
queue.events.push({
dataReady: dataReady
});
}

_getQueue(event) {
private getQueue(event: {room_id: string}) {
const identifier = this.type === "per_room" ? event.room_id : "none";
if (!this._queues[identifier]) {
this._queues[identifier] = {
if (!this.queues[identifier]) {
this.queues[identifier] = {
events: [],
consuming: false
};
}
return this._queues[identifier];
return this.queues[identifier];
}

/**
* Starts consuming the queue.
*
* As long as events are enqueued they will continue to be consumed.
*/
consume() {
Object.keys(this._queues).forEach((identifier) => {
if (!this._queues[identifier].consuming) {
this._queues[identifier].consuming = true;
this._takeNext(identifier);
public consume() {
Object.keys(this.queues).forEach((identifier) => {
if (!this.queues[identifier].consuming) {
this.queues[identifier].consuming = true;
this.takeNext(identifier);
}
});
}

_takeNext(identifier) {
const events = this._queues[identifier].events;
if (events.length === 0) {
this._queues[identifier].consuming = false;
private takeNext(identifier: string) {
const events = this.queues[identifier].events;
const entry = events.shift();
if (!entry) {
this.queues[identifier].consuming = false;
return;
}
const entry = events.shift();

Bluebird.resolve(entry.dataReady).asCallback(this.consumeFn);
entry.dataReady.finally(() => this._takeNext(identifier));
entry.dataReady.finally(() => this.takeNext(identifier));
}

/**
Expand All @@ -102,9 +104,9 @@ class EventQueue {
* is consumed.
* @return {EventQueue} The newly created EventQueue.
*/
static create(opts, consumeFn) {
static create(opts: { type: "none"|"single"|"per_room"}, consumeFn: ConsumeCallback) {
const type = opts.type;
/* eslint-disable no-use-before-define */
/* eslint-disable @typescript-eslint/no-use-before-define */
if (type == "single") {
return new EventQueueSingle(consumeFn);
}
Expand All @@ -114,7 +116,7 @@ class EventQueue {
if (type == "none") {
return new EventQueueNone(consumeFn);
}
/* eslint-enable no-use-before-define */
/* eslint-enable @typescript-eslint/no-use-before-define */
throw Error(`Invalid EventQueue type '${type}'.`);
}
}
Expand All @@ -124,8 +126,8 @@ class EventQueue {
*
* The foremost event is processed as soon as its data is available.
*/
class EventQueueSingle extends EventQueue {
constructor(consumeFn) {
export class EventQueueSingle extends EventQueue {
constructor(consumeFn: ConsumeCallback) {
super("single", consumeFn);
}
}
Expand All @@ -135,8 +137,8 @@ class EventQueueSingle extends EventQueue {
*
* Events at the head of line are processed as soon as their data is available.
*/
class EventQueuePerRoom extends EventQueue {
constructor(consumeFn) {
export class EventQueuePerRoom extends EventQueue {
constructor(consumeFn: ConsumeCallback) {
super("per_room", consumeFn);
}
}
Expand All @@ -146,12 +148,12 @@ class EventQueuePerRoom extends EventQueue {
*
* Every event is handled as soon as its data is available.
*/
class EventQueueNone extends EventQueue {
constructor(consumeFn) {
export class EventQueueNone extends EventQueue {
constructor(consumeFn: ConsumeCallback) {
super("none", consumeFn);
}

push(event, dataReady) {
push(event: unknown, dataReady: DataReady) {
// consume the event instantly
Bluebird.resolve(dataReady).asCallback(this.consumeFn);
}
Expand All @@ -166,10 +168,3 @@ class EventQueueNone extends EventQueue {
* @param {Error} [err] The error in case the data could not be retrieved.
* @param {object} data The data associated with the consumed event.
*/

module.exports = {
EventQueue,
EventQueueSingle,
EventQueuePerRoom,
EventQueueNone,
};

0 comments on commit b334c87

Please sign in to comment.