diff --git a/lib/dataconnection/BufferedConnection/BinaryPack.ts b/lib/dataconnection/BufferedConnection/BinaryPack.ts index 44b7862be..c3bd2fb2c 100644 --- a/lib/dataconnection/BufferedConnection/BinaryPack.ts +++ b/lib/dataconnection/BufferedConnection/BinaryPack.ts @@ -1,4 +1,4 @@ -import { BinaryPackChunker, concatArrayBuffers } from "./binaryPackChunker"; +import { BinaryPackChunk, BinaryPackChunker, concatArrayBuffers, isBinaryPackChunk } from "./binaryPackChunker"; import logger from "../../logger"; import type { Peer } from "../../peer"; import { BufferedConnection } from "./BufferedConnection"; @@ -27,33 +27,29 @@ export class BinaryPack extends BufferedConnection { } // Handles a DataChannel message. - protected override _handleDataMessage({ data }: { data: Uint8Array }): void { - const deserializedData = unpack(data); - - // PeerJS specific message - const peerData = deserializedData["__peerData"]; - if (peerData) { - if (peerData.type === "close") { - this.close(); - return; - } - - // Chunked data -- piece things back together. - // @ts-ignore - this._handleChunk(deserializedData); - return; - } + protected _handleDataMessage({ data }: { data: Uint8Array }): void { + const deserializedData = unpack(data); + + // PeerJS specific message + const peerData = deserializedData["__peerData"]; + if (peerData) { + if (peerData.type === "close") { + this.close(); + return; + } + } + + if (isBinaryPackChunk(deserializedData)) { + this._handleChunk(deserializedData); + return; + } + + this.emit("data", deserializedData); + } - this.emit("data", deserializedData); - } - private _handleChunk(data: { - __peerData: number; - n: number; - total: number; - data: ArrayBuffer; - }): void { - const id = data.__peerData; + private _handleChunk(data: BinaryPackChunk): void { + const id = data.id; const chunkInfo = this._chunkedData[id] || { data: [], count: 0, diff --git a/lib/dataconnection/BufferedConnection/binaryPackChunker.ts b/lib/dataconnection/BufferedConnection/binaryPackChunker.ts index 168529fa6..90d5323b5 100644 --- a/lib/dataconnection/BufferedConnection/binaryPackChunker.ts +++ b/lib/dataconnection/BufferedConnection/binaryPackChunker.ts @@ -1,14 +1,29 @@ +export interface BinaryPackChunk { + id: number + n: number + total: number + data: ArrayBuffer +}; + +export function isBinaryPackChunk(obj: any): obj is BinaryPackChunk { + return typeof obj === 'object' && 'id' in obj; +} + export class BinaryPackChunker { readonly chunkedMTU = 16300; // The original 60000 bytes setting does not work when sending data from Firefox to Chrome, which is "cut off" after 16384 bytes and delivered individually. // Binary stuff private _dataCount: number = 1; + + public get nextID(): number { + return this._dataCount; + } chunk = ( blob: ArrayBuffer, - ): { __peerData: number; n: number; total: number; data: Uint8Array }[] => { - const chunks = []; + ): BinaryPackChunk[] => { + const chunks: BinaryPackChunk[] = []; const size = blob.byteLength; const total = Math.ceil(size / this.chunkedMTU); @@ -19,8 +34,8 @@ export class BinaryPackChunker { const end = Math.min(size, start + this.chunkedMTU); const b = blob.slice(start, end); - const chunk = { - __peerData: this._dataCount, + const chunk: BinaryPackChunk = { + id: this._dataCount, n: index, data: b, total, @@ -36,6 +51,18 @@ export class BinaryPackChunker { return chunks; }; + + singleChunk = (blob: ArrayBuffer): BinaryPackChunk => { + const id = this._dataCount; + this._dataCount++; + + return { + id, + n: 0, + total: 1, + data: new Uint8Array(blob), + }; + } } export function concatArrayBuffers(bufs: Uint8Array[]) { diff --git a/lib/dataconnection/BufferedNotifyConnection.ts b/lib/dataconnection/BufferedNotifyConnection.ts new file mode 100644 index 000000000..4d4f83f48 --- /dev/null +++ b/lib/dataconnection/BufferedNotifyConnection.ts @@ -0,0 +1,178 @@ +import { pack, unpack } from "peerjs-js-binarypack"; +import logger from "../logger"; +import { DataConnection, SendData } from "./DataConnection"; +import { BinaryPackChunk, BinaryPackChunker, concatArrayBuffers, isBinaryPackChunk } from "./BufferedConnection/binaryPackChunker"; + + +export class BufferedNotifyConnection extends DataConnection { + readonly serialization = 'notify'; + private readonly chunker = new BinaryPackChunker(); + + private _chunkedData: { + [id: number]: { + data: Uint8Array[]; + count: number; + total: number; + }; + } = {}; + + private _buffer: BinaryPackChunk[] = []; + private _bufferSize = 0; + private _buffering = false; + + public get bufferSize(): number { + return this._bufferSize; + } + + public get nextID(): number { + return this.chunker.nextID; + } + + public override _initializeDataChannel(dc: RTCDataChannel) { + super._initializeDataChannel(dc); + this.dataChannel.binaryType = "arraybuffer"; + this.dataChannel.addEventListener("message", (e) => + this._handleDataMessage(e), + ); + } + + // Handles a DataChannel message. + protected _handleDataMessage({ data }: { data: Uint8Array }): void { + const deserializedData = unpack(data); + + // PeerJS specific message + const peerData = deserializedData["__peerData"]; + if (peerData) { + if (peerData.type === "close") { + this.close(); + return; + } + } + + if (isBinaryPackChunk(deserializedData)) { + this._handleChunk(deserializedData); + return; + } + + this.emit("data", deserializedData); + } + + private _handleChunk(data: BinaryPackChunk): void { + const id = data.id; + const chunkInfo = this._chunkedData[id] || { + data: [], + count: 0, + total: data.total, + }; + + chunkInfo.data[data.n] = new Uint8Array(data.data); + chunkInfo.count++; + this._chunkedData[id] = chunkInfo; + + if (chunkInfo.total === chunkInfo.count) { + // Clean up before making the recursive call to `_handleDataMessage`. + delete this._chunkedData[id]; + + // We've received all the chunks--time to construct the complete data. + // const data = new Blob(chunkInfo.data); + const data = concatArrayBuffers(chunkInfo.data); + this._handleDataMessage({ data }); + } + } + + public SendWithCallback(data: any, callback: (chunk: BinaryPackChunk) => void): SendData { + throw new Error("Method not implemented."); + } + + protected _send(data: any): void { + const blob = pack(data); + + if (blob.byteLength > this.chunker.chunkedMTU) { + + const blobs = this.chunker.chunk(blob); + logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`); + + for (const blob of blobs) { + this._bufferedSend(blob); + } + return; + + } + //We send everything in one chunk + const msg = this.chunker.singleChunk(blob); + this._bufferedSend(msg); + } + + protected _bufferedSend(msg: BinaryPackChunk): void { + if (this._buffering || !this._trySend(msg)) { + this._buffer.push(msg); + this._bufferSize = this._buffer.length; + } + } + + // Returns true if the send succeeds. + private _trySend(msg: BinaryPackChunk): boolean { + if (!this.open) { + return false; + } + + if (this.dataChannel.bufferedAmount > DataConnection.MAX_BUFFERED_AMOUNT) { + this._buffering = true; + setTimeout(() => { + this._buffering = false; + this._tryBuffer(); + }, 50); + + return false; + } + + try { + // Send notification + this.emit("sentChunk", { id: msg.id, n: msg.n, total: msg.total }); + const msgPacked = pack(msg as any); + this.dataChannel.send(msgPacked); + } catch (e) { + logger.error(`DC#:${this.connectionId} Error when sending:`, e); + this._buffering = true; + + this.close(); + + return false; + } + + return true; + } + + // Try to send the first message in the buffer. + private _tryBuffer(): void { + if (!this.open) { + return; + } + + if (this._buffer.length === 0) { + return; + } + + const msg = this._buffer[0]; + + if (this._trySend(msg)) { + this._buffer.shift(); + this._bufferSize = this._buffer.length; + this._tryBuffer(); + } + } + + public override close(options?: { flush?: boolean }) { + if (options?.flush) { + this.send({ + __peerData: { + type: "close", + }, + }); + return; + } + this._buffer = []; + this._bufferSize = 0; + super.close(); + } +} diff --git a/lib/dataconnection/DataConnection.ts b/lib/dataconnection/DataConnection.ts index 863a37598..ab879ec40 100644 --- a/lib/dataconnection/DataConnection.ts +++ b/lib/dataconnection/DataConnection.ts @@ -12,9 +12,18 @@ import type { ServerMessage } from "../servermessage"; import type { EventsWithError } from "../peerError"; import { randomToken } from "../utils/randomToken"; +export interface SendData { + id: number, + total: number +} + +export interface ChunkSentNotification extends SendData { + n: number +} + export interface DataConnectionEvents extends EventsWithError, - BaseConnectionEvents { + BaseConnectionEvents { /** * Emitted when data is received from the remote peer. */ @@ -23,6 +32,13 @@ export interface DataConnectionEvents * Emitted when the connection is established and ready-to-use. */ open: () => void; + + /** + * Emitted when the connection sends out a chunk of data to the remote peer. + * Currently Only implemented by BufferedNotifyConnection. + */ + sentChunk: (chunk: ChunkSentNotification) => void; + } /** @@ -126,7 +142,24 @@ export abstract class DataConnection extends BaseConnection< protected abstract _send(data: any, chunked: boolean): void; - /** Allows user to send data. */ + + /** + * Allows user to send data. + * @param data + * @param chunked + * @example + * + * const nextId = conn.nextID; + * conn.on('sentChunk', (chunk) => { + * if (chunk.id === nextId) { + * console.log('Sent chunk', chunk); + * if (chunk.n == chunk.total - 1) { + * console.log('Sent last chunk'); + * } + * } + * }); + * conn.send(arr); + */ public send(data: any, chunked = false) { if (!this.open) { this.emitError( diff --git a/lib/exports.ts b/lib/exports.ts index 3dd8d5175..a2b577993 100644 --- a/lib/exports.ts +++ b/lib/exports.ts @@ -12,7 +12,7 @@ export type { CallOption, } from "./optionInterfaces"; export type { UtilSupportsObj } from "./util"; -export type { DataConnection } from "./dataconnection/DataConnection"; +export type { DataConnection, ChunkSentNotification, SendData } from "./dataconnection/DataConnection"; export type { MediaConnection } from "./mediaconnection"; export type { LogLevel } from "./logger"; export * from "./enums"; @@ -20,6 +20,7 @@ export * from "./enums"; export { BufferedConnection } from "./dataconnection/BufferedConnection/BufferedConnection"; export { StreamConnection } from "./dataconnection/StreamConnection/StreamConnection"; export { Cbor } from "./dataconnection/StreamConnection/Cbor"; +export { BufferedNotifyConnection } from "./dataconnection/BufferedNotifyConnection"; export { MsgPack } from "./dataconnection/StreamConnection/MsgPack"; export type { SerializerMapping } from "./peer"; diff --git a/lib/peer.ts b/lib/peer.ts index 6fa306cbc..3d9dfe022 100644 --- a/lib/peer.ts +++ b/lib/peer.ts @@ -21,6 +21,7 @@ import { Raw } from "./dataconnection/BufferedConnection/Raw"; import { Json } from "./dataconnection/BufferedConnection/Json"; import { EventEmitterWithError, PeerError } from "./peerError"; +import { BufferedNotifyConnection } from "./exports"; class PeerOptions implements PeerJSOption { /** @@ -118,6 +119,7 @@ export class Peer extends EventEmitterWithError { json: Json, binary: BinaryPack, "binary-utf8": BinaryPack, + notify: BufferedNotifyConnection, default: BinaryPack, }; diff --git a/package.json b/package.json index d8117e01b..faf791e4c 100644 --- a/package.json +++ b/package.json @@ -175,7 +175,7 @@ "contributors": "git-authors-cli --print=false && prettier --write package.json && git add package.json package-lock.json && git commit -m \"chore(contributors): update and sort contributors list\"", "check": "tsc --noEmit && tsc -p e2e/tsconfig.json --noEmit", "watch": "parcel watch", - "build": "rm -rf dist && parcel build", + "build": "npx rimraf dist && parcel build", "prepublishOnly": "npm run build", "test": "jest", "test:watch": "jest --watch",