From d2dcd3c9173efe997a53b462571c7d7404e4518f Mon Sep 17 00:00:00 2001 From: Leon Ahuja Date: Wed, 20 Sep 2023 07:23:04 +1000 Subject: [PATCH 1/5] untested addition of BufferedNotifyConnection --- .../BufferedConnection/binaryPackChunker.ts | 10 +- .../BufferedNotifyConnection.ts | 182 ++++++++++++++++++ lib/dataconnection/DataConnection.ts | 14 +- lib/exports.ts | 1 + lib/peer.ts | 2 + package.json | 2 +- 6 files changed, 208 insertions(+), 3 deletions(-) create mode 100644 lib/dataconnection/BufferedNotifyConnection.ts diff --git a/lib/dataconnection/BufferedConnection/binaryPackChunker.ts b/lib/dataconnection/BufferedConnection/binaryPackChunker.ts index 168529fa6..8961a125c 100644 --- a/lib/dataconnection/BufferedConnection/binaryPackChunker.ts +++ b/lib/dataconnection/BufferedConnection/binaryPackChunker.ts @@ -1,3 +1,11 @@ + +export interface BinaryPackChunk { + __peerData: number + n: number + total: number + data: Uint8Array +}; + export class BinaryPackChunker { readonly chunkedMTU = 16300; // The original 60000 bytes setting does not work when sending data from Firefox to Chrome, which is "cut off" after 16384 bytes and delivered individually. @@ -7,7 +15,7 @@ export class BinaryPackChunker { chunk = ( blob: ArrayBuffer, - ): { __peerData: number; n: number; total: number; data: Uint8Array }[] => { + ): BinaryPackChunk[] => { const chunks = []; const size = blob.byteLength; const total = Math.ceil(size / this.chunkedMTU); diff --git a/lib/dataconnection/BufferedNotifyConnection.ts b/lib/dataconnection/BufferedNotifyConnection.ts new file mode 100644 index 000000000..84be7d79d --- /dev/null +++ b/lib/dataconnection/BufferedNotifyConnection.ts @@ -0,0 +1,182 @@ +import { pack, unpack } from "peerjs-js-binarypack"; +import logger from "../logger"; +import { DataConnection } from "./DataConnection"; +import { BinaryPackChunk, BinaryPackChunker, concatArrayBuffers } from "./BufferedConnection/binaryPackChunker"; + + +export class BufferedNotifyConnection extends DataConnection { + serialization: 'notify'; + private readonly chunker = new BinaryPackChunker(); + + private _chunkedData: { + [id: number]: { + data: Uint8Array[]; + count: number; + total: number; + }; + } = {}; + + + protected _send(data: any, chunked: boolean): { __peerData: number, total: number } { + + const blob = pack(data); + + if (!chunked && blob.byteLength > this.chunker.chunkedMTU) { + + const blobs = this.chunker.chunk(blob); + logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`); + + for (const blob of blobs) { + this._bufferedSend(blob); + } + + return { __peerData: blobs[0].__peerData, total: blobs.length }; + } + //We send everything in one chunk + + return { __peerData: 0, total: 1 }; + } + + private _buffer: BinaryPackChunk[] = []; + private _bufferSize = 0; + private _buffering = false; + + public get bufferSize(): number { + return this._bufferSize; + } + + public override _initializeDataChannel(dc: RTCDataChannel) { + super._initializeDataChannel(dc); + this.dataChannel.binaryType = "arraybuffer"; + this.dataChannel.addEventListener("message", (e) => + this._handleDataMessage(e), + ); + } + + + protected _handleDataMessage({ data }: { data: Uint8Array }): void { + // Assume we only get BinaryPackChunks + const deserializedData = unpack(data); + + // PeerJS specific message + const peerData = deserializedData["__peerData"]; + if (peerData) { + if (peerData.type === "close") { + this.close(); + return; + } + + if (typeof peerData === "number") { + // @ts-ignore + this._handleChunk(deserializedData); + return; + } + + } + + this.emit("data", deserializedData); + } + + + private _handleChunk(data: { + __peerData: number; + n: number; + total: number; + data: ArrayBuffer; + }): void { + const id = data.__peerData; + const chunkInfo = this._chunkedData[id] || { + data: [], + count: 0, + total: data.total, + }; + + chunkInfo.data[data.n] = new Uint8Array(data.data); + chunkInfo.count++; + this._chunkedData[id] = chunkInfo; + + if (chunkInfo.total === chunkInfo.count) { + // Clean up before making the recursive call to `_handleDataMessage`. + delete this._chunkedData[id]; + + // We've received all the chunks--time to construct the complete data. + // const data = new Blob(chunkInfo.data); + const data = concatArrayBuffers(chunkInfo.data); + this._handleDataMessage({ data }); + } + } + + + protected _bufferedSend(msg: BinaryPackChunk): void { + if (this._buffering || !this._trySend(msg)) { + this._buffer.push(msg); + this._bufferSize = this._buffer.length; + } + } + + // Returns true if the send succeeds. + private _trySend(msg: BinaryPackChunk): boolean { + if (!this.open) { + return false; + } + + if (this.dataChannel.bufferedAmount > DataConnection.MAX_BUFFERED_AMOUNT) { + this._buffering = true; + setTimeout(() => { + this._buffering = false; + this._tryBuffer(); + }, 50); + + return false; + } + + try { + // Send notification + this.emit("sentChunk", { __peerData: msg.__peerData, n: msg.n }); + const msgPacked = pack(msg as any); + this.dataChannel.send(msgPacked); + } catch (e) { + logger.error(`DC#:${this.connectionId} Error when sending:`, e); + this._buffering = true; + + this.close(); + + return false; + } + + return true; + } + + // Try to send the first message in the buffer. + private _tryBuffer(): void { + if (!this.open) { + return; + } + + if (this._buffer.length === 0) { + return; + } + + const msg = this._buffer[0]; + + if (this._trySend(msg)) { + this._buffer.shift(); + this._bufferSize = this._buffer.length; + this._tryBuffer(); + } + } + + public override close(options?: { flush?: boolean }) { + if (options?.flush) { + this.send({ + __peerData: { + type: "close", + }, + }); + return; + } + this._buffer = []; + this._bufferSize = 0; + super.close(); + } +} diff --git a/lib/dataconnection/DataConnection.ts b/lib/dataconnection/DataConnection.ts index 863a37598..dde8f4dcc 100644 --- a/lib/dataconnection/DataConnection.ts +++ b/lib/dataconnection/DataConnection.ts @@ -12,9 +12,14 @@ import type { ServerMessage } from "../servermessage"; import type { EventsWithError } from "../peerError"; import { randomToken } from "../utils/randomToken"; +export interface ChunkSentNotification { + __peerData: number, + n: number +} + export interface DataConnectionEvents extends EventsWithError, - BaseConnectionEvents { + BaseConnectionEvents { /** * Emitted when data is received from the remote peer. */ @@ -23,6 +28,13 @@ export interface DataConnectionEvents * Emitted when the connection is established and ready-to-use. */ open: () => void; + + /** + * Emitted when the connection sends out a chunk of data to the remote peer. + * Currently Only implemented by BufferedNotifyConnection. + */ + sentChunk: (chunk: ChunkSentNotification) => void; + } /** diff --git a/lib/exports.ts b/lib/exports.ts index 3dd8d5175..40f84bb2e 100644 --- a/lib/exports.ts +++ b/lib/exports.ts @@ -20,6 +20,7 @@ export * from "./enums"; export { BufferedConnection } from "./dataconnection/BufferedConnection/BufferedConnection"; export { StreamConnection } from "./dataconnection/StreamConnection/StreamConnection"; export { Cbor } from "./dataconnection/StreamConnection/Cbor"; +export { BufferedNotifyConnection } from "./dataconnection/BufferedNotifyConnection"; export { MsgPack } from "./dataconnection/StreamConnection/MsgPack"; export type { SerializerMapping } from "./peer"; diff --git a/lib/peer.ts b/lib/peer.ts index 6fa306cbc..3d9dfe022 100644 --- a/lib/peer.ts +++ b/lib/peer.ts @@ -21,6 +21,7 @@ import { Raw } from "./dataconnection/BufferedConnection/Raw"; import { Json } from "./dataconnection/BufferedConnection/Json"; import { EventEmitterWithError, PeerError } from "./peerError"; +import { BufferedNotifyConnection } from "./exports"; class PeerOptions implements PeerJSOption { /** @@ -118,6 +119,7 @@ export class Peer extends EventEmitterWithError { json: Json, binary: BinaryPack, "binary-utf8": BinaryPack, + notify: BufferedNotifyConnection, default: BinaryPack, }; diff --git a/package.json b/package.json index d8117e01b..faf791e4c 100644 --- a/package.json +++ b/package.json @@ -175,7 +175,7 @@ "contributors": "git-authors-cli --print=false && prettier --write package.json && git add package.json package-lock.json && git commit -m \"chore(contributors): update and sort contributors list\"", "check": "tsc --noEmit && tsc -p e2e/tsconfig.json --noEmit", "watch": "parcel watch", - "build": "rm -rf dist && parcel build", + "build": "npx rimraf dist && parcel build", "prepublishOnly": "npm run build", "test": "jest", "test:watch": "jest --watch", From 4408c5a2eeb79d2527fc740b2f37bee92f7a3329 Mon Sep 17 00:00:00 2001 From: Leon Ahuja Date: Wed, 20 Sep 2023 09:04:25 +1000 Subject: [PATCH 2/5] working chunking and notification --- .../BufferedConnection/binaryPackChunker.ts | 12 ++++++++++++ lib/dataconnection/BufferedNotifyConnection.ts | 10 ++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/lib/dataconnection/BufferedConnection/binaryPackChunker.ts b/lib/dataconnection/BufferedConnection/binaryPackChunker.ts index 8961a125c..91b46d7b9 100644 --- a/lib/dataconnection/BufferedConnection/binaryPackChunker.ts +++ b/lib/dataconnection/BufferedConnection/binaryPackChunker.ts @@ -44,6 +44,18 @@ export class BinaryPackChunker { return chunks; }; + + singleChunk = (blob: ArrayBuffer): BinaryPackChunk => { + const __peerData = this._dataCount; + this._dataCount++; + + return { + __peerData, + n: 0, + total: 1, + data: new Uint8Array(blob), + }; + } } export function concatArrayBuffers(bufs: Uint8Array[]) { diff --git a/lib/dataconnection/BufferedNotifyConnection.ts b/lib/dataconnection/BufferedNotifyConnection.ts index 84be7d79d..9516c6800 100644 --- a/lib/dataconnection/BufferedNotifyConnection.ts +++ b/lib/dataconnection/BufferedNotifyConnection.ts @@ -5,7 +5,7 @@ import { BinaryPackChunk, BinaryPackChunker, concatArrayBuffers } from "./Buffer export class BufferedNotifyConnection extends DataConnection { - serialization: 'notify'; + readonly serialization = 'notify'; private readonly chunker = new BinaryPackChunker(); private _chunkedData: { @@ -17,11 +17,11 @@ export class BufferedNotifyConnection extends DataConnection { } = {}; - protected _send(data: any, chunked: boolean): { __peerData: number, total: number } { + public _send(data: any): { __peerData: number, total: number } { const blob = pack(data); - if (!chunked && blob.byteLength > this.chunker.chunkedMTU) { + if (blob.byteLength > this.chunker.chunkedMTU) { const blobs = this.chunker.chunk(blob); logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`); @@ -33,8 +33,10 @@ export class BufferedNotifyConnection extends DataConnection { return { __peerData: blobs[0].__peerData, total: blobs.length }; } //We send everything in one chunk + const msg = this.chunker.singleChunk(blob); + this._bufferedSend(msg); - return { __peerData: 0, total: 1 }; + return { __peerData: msg.__peerData, total: 1 }; } private _buffer: BinaryPackChunk[] = []; From 1858ebdf5b87a65a1234b2c9f57b182d15e14755 Mon Sep 17 00:00:00 2001 From: Leon Ahuja Date: Fri, 22 Sep 2023 15:04:27 +1000 Subject: [PATCH 3/5] Working implementation of BufferedNotify, final API --- .../BufferedConnection/BinaryPack.ts | 48 ++++++------- .../BufferedConnection/binaryPackChunker.ts | 19 ++--- .../BufferedNotifyConnection.ts | 72 ++++++++----------- lib/dataconnection/DataConnection.ts | 10 ++- 4 files changed, 70 insertions(+), 79 deletions(-) diff --git a/lib/dataconnection/BufferedConnection/BinaryPack.ts b/lib/dataconnection/BufferedConnection/BinaryPack.ts index 44b7862be..c3bd2fb2c 100644 --- a/lib/dataconnection/BufferedConnection/BinaryPack.ts +++ b/lib/dataconnection/BufferedConnection/BinaryPack.ts @@ -1,4 +1,4 @@ -import { BinaryPackChunker, concatArrayBuffers } from "./binaryPackChunker"; +import { BinaryPackChunk, BinaryPackChunker, concatArrayBuffers, isBinaryPackChunk } from "./binaryPackChunker"; import logger from "../../logger"; import type { Peer } from "../../peer"; import { BufferedConnection } from "./BufferedConnection"; @@ -27,33 +27,29 @@ export class BinaryPack extends BufferedConnection { } // Handles a DataChannel message. - protected override _handleDataMessage({ data }: { data: Uint8Array }): void { - const deserializedData = unpack(data); - - // PeerJS specific message - const peerData = deserializedData["__peerData"]; - if (peerData) { - if (peerData.type === "close") { - this.close(); - return; - } - - // Chunked data -- piece things back together. - // @ts-ignore - this._handleChunk(deserializedData); - return; - } + protected _handleDataMessage({ data }: { data: Uint8Array }): void { + const deserializedData = unpack(data); + + // PeerJS specific message + const peerData = deserializedData["__peerData"]; + if (peerData) { + if (peerData.type === "close") { + this.close(); + return; + } + } + + if (isBinaryPackChunk(deserializedData)) { + this._handleChunk(deserializedData); + return; + } + + this.emit("data", deserializedData); + } - this.emit("data", deserializedData); - } - private _handleChunk(data: { - __peerData: number; - n: number; - total: number; - data: ArrayBuffer; - }): void { - const id = data.__peerData; + private _handleChunk(data: BinaryPackChunk): void { + const id = data.id; const chunkInfo = this._chunkedData[id] || { data: [], count: 0, diff --git a/lib/dataconnection/BufferedConnection/binaryPackChunker.ts b/lib/dataconnection/BufferedConnection/binaryPackChunker.ts index 91b46d7b9..ddd8a39a3 100644 --- a/lib/dataconnection/BufferedConnection/binaryPackChunker.ts +++ b/lib/dataconnection/BufferedConnection/binaryPackChunker.ts @@ -1,11 +1,14 @@ - export interface BinaryPackChunk { - __peerData: number + id: number n: number total: number - data: Uint8Array + data: ArrayBuffer }; +export function isBinaryPackChunk(obj: any): obj is BinaryPackChunk { + return typeof obj === 'object' && 'id' in obj; +} + export class BinaryPackChunker { readonly chunkedMTU = 16300; // The original 60000 bytes setting does not work when sending data from Firefox to Chrome, which is "cut off" after 16384 bytes and delivered individually. @@ -16,7 +19,7 @@ export class BinaryPackChunker { chunk = ( blob: ArrayBuffer, ): BinaryPackChunk[] => { - const chunks = []; + const chunks: BinaryPackChunk[] = []; const size = blob.byteLength; const total = Math.ceil(size / this.chunkedMTU); @@ -27,8 +30,8 @@ export class BinaryPackChunker { const end = Math.min(size, start + this.chunkedMTU); const b = blob.slice(start, end); - const chunk = { - __peerData: this._dataCount, + const chunk: BinaryPackChunk = { + id: this._dataCount, n: index, data: b, total, @@ -46,11 +49,11 @@ export class BinaryPackChunker { }; singleChunk = (blob: ArrayBuffer): BinaryPackChunk => { - const __peerData = this._dataCount; + const id = this._dataCount; this._dataCount++; return { - __peerData, + id, n: 0, total: 1, data: new Uint8Array(blob), diff --git a/lib/dataconnection/BufferedNotifyConnection.ts b/lib/dataconnection/BufferedNotifyConnection.ts index 9516c6800..d67c3448e 100644 --- a/lib/dataconnection/BufferedNotifyConnection.ts +++ b/lib/dataconnection/BufferedNotifyConnection.ts @@ -1,7 +1,7 @@ import { pack, unpack } from "peerjs-js-binarypack"; import logger from "../logger"; -import { DataConnection } from "./DataConnection"; -import { BinaryPackChunk, BinaryPackChunker, concatArrayBuffers } from "./BufferedConnection/binaryPackChunker"; +import { DataConnection, SendData } from "./DataConnection"; +import { BinaryPackChunk, BinaryPackChunker, concatArrayBuffers, isBinaryPackChunk } from "./BufferedConnection/binaryPackChunker"; export class BufferedNotifyConnection extends DataConnection { @@ -16,29 +16,6 @@ export class BufferedNotifyConnection extends DataConnection { }; } = {}; - - public _send(data: any): { __peerData: number, total: number } { - - const blob = pack(data); - - if (blob.byteLength > this.chunker.chunkedMTU) { - - const blobs = this.chunker.chunk(blob); - logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`); - - for (const blob of blobs) { - this._bufferedSend(blob); - } - - return { __peerData: blobs[0].__peerData, total: blobs.length }; - } - //We send everything in one chunk - const msg = this.chunker.singleChunk(blob); - this._bufferedSend(msg); - - return { __peerData: msg.__peerData, total: 1 }; - } - private _buffer: BinaryPackChunk[] = []; private _bufferSize = 0; private _buffering = false; @@ -55,9 +32,8 @@ export class BufferedNotifyConnection extends DataConnection { ); } - + // Handles a DataChannel message. protected _handleDataMessage({ data }: { data: Uint8Array }): void { - // Assume we only get BinaryPackChunks const deserializedData = unpack(data); // PeerJS specific message @@ -67,26 +43,18 @@ export class BufferedNotifyConnection extends DataConnection { this.close(); return; } + } - if (typeof peerData === "number") { - // @ts-ignore - this._handleChunk(deserializedData); - return; - } - + if (isBinaryPackChunk(deserializedData)) { + this._handleChunk(deserializedData); + return; } this.emit("data", deserializedData); } - - private _handleChunk(data: { - __peerData: number; - n: number; - total: number; - data: ArrayBuffer; - }): void { - const id = data.__peerData; + private _handleChunk(data: BinaryPackChunk): void { + const id = data.id; const chunkInfo = this._chunkedData[id] || { data: [], count: 0, @@ -108,6 +76,26 @@ export class BufferedNotifyConnection extends DataConnection { } } + public _send(data: any): SendData { + const blob = pack(data); + + if (blob.byteLength > this.chunker.chunkedMTU) { + + const blobs = this.chunker.chunk(blob); + logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`); + + for (const blob of blobs) { + this._bufferedSend(blob); + } + + return { id: blobs[0].id, total: blobs.length }; + } + //We send everything in one chunk + const msg = this.chunker.singleChunk(blob); + this._bufferedSend(msg); + + return { id: msg.id, total: 1 }; + } protected _bufferedSend(msg: BinaryPackChunk): void { if (this._buffering || !this._trySend(msg)) { @@ -134,7 +122,7 @@ export class BufferedNotifyConnection extends DataConnection { try { // Send notification - this.emit("sentChunk", { __peerData: msg.__peerData, n: msg.n }); + this.emit("sentChunk", { id: msg.id, n: msg.n, total: msg.total }); const msgPacked = pack(msg as any); this.dataChannel.send(msgPacked); } catch (e) { diff --git a/lib/dataconnection/DataConnection.ts b/lib/dataconnection/DataConnection.ts index dde8f4dcc..dd7b02e9f 100644 --- a/lib/dataconnection/DataConnection.ts +++ b/lib/dataconnection/DataConnection.ts @@ -12,8 +12,12 @@ import type { ServerMessage } from "../servermessage"; import type { EventsWithError } from "../peerError"; import { randomToken } from "../utils/randomToken"; -export interface ChunkSentNotification { - __peerData: number, +export interface SendData { + id: number, + total: number +} + +export interface ChunkSentNotification extends SendData{ n: number } @@ -136,7 +140,7 @@ export abstract class DataConnection extends BaseConnection< super.emit("close"); } - protected abstract _send(data: any, chunked: boolean): void; + protected abstract _send(data: any, chunked: boolean): void | Promise | SendData; /** Allows user to send data. */ public send(data: any, chunked = false) { From 777e423886472ba3ebfc0739b2944eb0bb022d9e Mon Sep 17 00:00:00 2001 From: Leon Ahuja Date: Fri, 22 Sep 2023 15:26:05 +1000 Subject: [PATCH 4/5] export type and provide example --- lib/dataconnection/DataConnection.ts | 19 ++++++++++++++++++- lib/exports.ts | 2 +- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/lib/dataconnection/DataConnection.ts b/lib/dataconnection/DataConnection.ts index dd7b02e9f..8694d10b6 100644 --- a/lib/dataconnection/DataConnection.ts +++ b/lib/dataconnection/DataConnection.ts @@ -17,7 +17,7 @@ export interface SendData { total: number } -export interface ChunkSentNotification extends SendData{ +export interface ChunkSentNotification extends SendData { n: number } @@ -140,6 +140,23 @@ export abstract class DataConnection extends BaseConnection< super.emit("close"); } + /** + * @param data + * @param chunked + * @returns Returns SendData if datachannel is notification based + * @example + * const res = conn.send(currentMessage); + * if (typeof res === 'object' && 'id' in res) { + * conn.on('sentChunk', (chunk) => { + * if (chunk.id === res.id) { + * console.log('Sent chunk', chunk); + * if (chunk.n == res.total -1) { + * console.log('Sent last chunk'); + * } + * } + * }); + * } + */ protected abstract _send(data: any, chunked: boolean): void | Promise | SendData; /** Allows user to send data. */ diff --git a/lib/exports.ts b/lib/exports.ts index 40f84bb2e..a2b577993 100644 --- a/lib/exports.ts +++ b/lib/exports.ts @@ -12,7 +12,7 @@ export type { CallOption, } from "./optionInterfaces"; export type { UtilSupportsObj } from "./util"; -export type { DataConnection } from "./dataconnection/DataConnection"; +export type { DataConnection, ChunkSentNotification, SendData } from "./dataconnection/DataConnection"; export type { MediaConnection } from "./mediaconnection"; export type { LogLevel } from "./logger"; export * from "./enums"; From 01177c97ba4998f3145de13a165ccfa89df1ea38 Mon Sep 17 00:00:00 2001 From: Leon Ahuja Date: Fri, 22 Sep 2023 16:28:45 +1000 Subject: [PATCH 5/5] Finalise APi and comments --- .../BufferedConnection/binaryPackChunker.ts | 4 +++ .../BufferedNotifyConnection.ts | 16 ++++++---- lib/dataconnection/DataConnection.ts | 30 +++++++++---------- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/lib/dataconnection/BufferedConnection/binaryPackChunker.ts b/lib/dataconnection/BufferedConnection/binaryPackChunker.ts index ddd8a39a3..90d5323b5 100644 --- a/lib/dataconnection/BufferedConnection/binaryPackChunker.ts +++ b/lib/dataconnection/BufferedConnection/binaryPackChunker.ts @@ -15,6 +15,10 @@ export class BinaryPackChunker { // Binary stuff private _dataCount: number = 1; + + public get nextID(): number { + return this._dataCount; + } chunk = ( blob: ArrayBuffer, diff --git a/lib/dataconnection/BufferedNotifyConnection.ts b/lib/dataconnection/BufferedNotifyConnection.ts index d67c3448e..4d4f83f48 100644 --- a/lib/dataconnection/BufferedNotifyConnection.ts +++ b/lib/dataconnection/BufferedNotifyConnection.ts @@ -24,6 +24,10 @@ export class BufferedNotifyConnection extends DataConnection { return this._bufferSize; } + public get nextID(): number { + return this.chunker.nextID; + } + public override _initializeDataChannel(dc: RTCDataChannel) { super._initializeDataChannel(dc); this.dataChannel.binaryType = "arraybuffer"; @@ -32,7 +36,7 @@ export class BufferedNotifyConnection extends DataConnection { ); } - // Handles a DataChannel message. + // Handles a DataChannel message. protected _handleDataMessage({ data }: { data: Uint8Array }): void { const deserializedData = unpack(data); @@ -76,7 +80,11 @@ export class BufferedNotifyConnection extends DataConnection { } } - public _send(data: any): SendData { + public SendWithCallback(data: any, callback: (chunk: BinaryPackChunk) => void): SendData { + throw new Error("Method not implemented."); + } + + protected _send(data: any): void { const blob = pack(data); if (blob.byteLength > this.chunker.chunkedMTU) { @@ -87,14 +95,12 @@ export class BufferedNotifyConnection extends DataConnection { for (const blob of blobs) { this._bufferedSend(blob); } + return; - return { id: blobs[0].id, total: blobs.length }; } //We send everything in one chunk const msg = this.chunker.singleChunk(blob); this._bufferedSend(msg); - - return { id: msg.id, total: 1 }; } protected _bufferedSend(msg: BinaryPackChunk): void { diff --git a/lib/dataconnection/DataConnection.ts b/lib/dataconnection/DataConnection.ts index 8694d10b6..ab879ec40 100644 --- a/lib/dataconnection/DataConnection.ts +++ b/lib/dataconnection/DataConnection.ts @@ -140,26 +140,26 @@ export abstract class DataConnection extends BaseConnection< super.emit("close"); } + protected abstract _send(data: any, chunked: boolean): void; + + /** + * Allows user to send data. * @param data * @param chunked - * @returns Returns SendData if datachannel is notification based * @example - * const res = conn.send(currentMessage); - * if (typeof res === 'object' && 'id' in res) { - * conn.on('sentChunk', (chunk) => { - * if (chunk.id === res.id) { - * console.log('Sent chunk', chunk); - * if (chunk.n == res.total -1) { - * console.log('Sent last chunk'); - * } - * } - * }); - * } + * + * const nextId = conn.nextID; + * conn.on('sentChunk', (chunk) => { + * if (chunk.id === nextId) { + * console.log('Sent chunk', chunk); + * if (chunk.n == chunk.total - 1) { + * console.log('Sent last chunk'); + * } + * } + * }); + * conn.send(arr); */ - protected abstract _send(data: any, chunked: boolean): void | Promise | SendData; - - /** Allows user to send data. */ public send(data: any, chunked = false) { if (!this.open) { this.emitError(