Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
Merge pull request #23 from little-bear-labs/ckousik/interop-test
Browse files Browse the repository at this point in the history
Interop test between Go and Js implementations
  • Loading branch information
ckousik committed Sep 30, 2022
2 parents 5584eba + efd09ca commit 6824cfd
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 132 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ bower_components
Thumbs.db

# Ignore built ts files
dist/**/*
dist/

# ignore yarn.lock
yarn.lock
Expand Down
10 changes: 9 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
"othergen": "./node_modules/.bin/proto-loader-gen-types --longs=String --enums=String --defaults --oneofs --grpcLib=@grpc/grpc-js --outDir=proto_ts/ src/*.proto",
"build": "aegir build",
"test": "aegir test --target browser",
"test:interop": "run-p --race start-ext-server wait-then-test",
"start-ext-server": "rm -vf dist/test/server-multiaddr.js ; cd ../go-libp2p/ && go run examples/webrtc/main.go ../js-libp2p-webrtc/dist/test/ ",
"wait-for-server": "wait-on --delay 1000 --timeout 10000 dist/test/server-multiaddr.js",
"wait-then-test": "run-s wait-for-server test",
"lint": "aegir lint",
"lint:fix": "aegir lint --fix",
"clean": "aegir clean",
Expand All @@ -35,14 +39,18 @@
},
"devDependencies": {
"@libp2p/interface-mocks": "^4.0.1",
"@libp2p/peer-id-factory": "^1.0.18",
"@multiformats/multiaddr": "^10.4.1",
"@types/uuid": "^8.3.4",
"@typescript-eslint/parser": "^5.32.0",
"aegir": "^37.4.6",
"it-all": "^1.0.6",
"it-first": "^1.0.7",
"npm-run-all": "^4.1.5",
"prettier": "^2.7.1",
"typescript": "^4.7.4",
"uint8arrays": "^3.1.0"
"uint8arrays": "^3.1.0",
"wait-on": "^6.0.1"
},
"dependencies": {
"@chainsafe/libp2p-noise": "^8.0.0",
Expand Down
2 changes: 1 addition & 1 deletion proto_ts/message.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// @generated by protobuf-ts 2.8.0
// @generated by protobuf-ts 2.8.1
// @generated from protobuf file "message.proto" (package "webrtc.pb", syntax proto2)
// tslint:disable
import type { BinaryWriteOptions } from "@protobuf-ts/runtime";
Expand Down
82 changes: 49 additions & 33 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { WebRTCStream } from './stream';
import { select as msselect, handle as mshandle } from '@libp2p/multistream-select';
import { Duplex } from 'it-stream-types';
import { Uint8ArrayList } from 'uint8arraylist';
import { dataChannelError, operationAborted, overStreamLimit } from './error';
import { connectionClosedError, dataChannelError, operationAborted, overStreamLimit } from './error';

const log = logger('libp2p:webrtc:connection');

Expand Down Expand Up @@ -40,6 +40,7 @@ export class WebRTCConnection implements ic.Connection {
remotePeer: PeerId;
tags: string[] = [];
components: Components;
closed: boolean = false;

private _streams: Map<string, ic.Stream> = new Map();
private peerConnection: RTCPeerConnection;
Expand All @@ -60,14 +61,22 @@ export class WebRTCConnection implements ic.Connection {
},
};
this.handleIncomingStreams();
this.peerConnection.onconnectionstatechange = (_) => {
switch(this.peerConnection.connectionState) {
case 'closed': // fallthrough
case 'failed': // fallthrough
case 'disconnected': // fallthrough
log.trace(`peerconnection moved to state: ${this.peerConnection.connectionState}`)
closed = true;
this.streams.forEach((stream) => stream.abort(connectionClosedError(this.peerConnection.connectionState, 'closing stream')))
}
}
}

private handleIncomingStreams() {
let metrics = this.components.getMetrics();
this.peerConnection.ondatachannel = async ({ channel }) => {
const logPrefix = `[stream:${channel.label}][inbound]`;
log.trace(`incoming stream - ${channel.label}`);
let [openPromise, abortPromise] = [defer(), defer()];
const [openPromise, abortPromise] = [defer(), defer()];
let controller = new TimeoutController(OPEN_STREAM_TIMEOUT);
controller.signal.onabort = () => abortPromise.resolve();
channel.onopen = () => openPromise.resolve();
Expand All @@ -77,7 +86,7 @@ export class WebRTCConnection implements ic.Connection {
throw operationAborted('prior to a new stream incoming.', controller.signal.reason);
}

let rawStream = new WebRTCStream({
const rawStream = new WebRTCStream({
channel,
stat: {
direction: 'inbound',
Expand All @@ -86,26 +95,30 @@ export class WebRTCConnection implements ic.Connection {
},
},
});
let registrar = this.components.getRegistrar();
let protocols = registrar.getProtocols();
const registrar = this.components.getRegistrar();
const protocols = registrar.getProtocols();

log.trace(`${logPrefix} supported protocols - ${protocols}`);
log.trace(`supported protocols - ${protocols}`);

let { stream, protocol } = await mshandle(rawStream, protocols, { signal: controller.signal });
if (metrics) {
metrics.trackStream({ stream, protocol, remotePeer: this.remotePeer });
}
try {
const { stream, protocol } = await mshandle(rawStream, protocols, { signal: controller.signal });
if (metrics) {
metrics.trackStream({ stream, protocol, remotePeer: this.remotePeer });
}

log.trace(`${logPrefix} handled protocol - ${protocol}`);
log.trace(`handled protocol - ${protocol}`);

rawStream.stat.protocol = protocol;
let result = this.wrapMsStream(rawStream, stream);
rawStream.stat.protocol = protocol;
const result = this.wrapMsStream(rawStream, stream);

this.addStream(result);
this.addStream(result);

// handle stream
let { handler } = registrar.getHandler(protocol);
handler({ connection: this, stream: result });
// handle stream
const { handler } = registrar.getHandler(protocol);
handler({ connection: this, stream: result });
} catch (err) {
log.error('stream error: ', rawStream.id, rawStream.stat.direction);
}
};
}

Expand All @@ -132,9 +145,9 @@ export class WebRTCConnection implements ic.Connection {
}

private findStreamLimit(protocol: string, direction: ic.Direction): number {
let registrar = this.components.getRegistrar();
const registrar = this.components.getRegistrar();
try {
let handler = registrar.getHandler(protocol);
const handler = registrar.getHandler(protocol);
return direction === 'inbound' ? handler.options.maxInboundStreams || DEFAULT_MAX_INBOUND_STREAMS : handler.options.maxOutboundStreams || DEFAULT_MAX_OUTBOUND_STREAMS;
} catch (err) {}
return direction === 'inbound' ? DEFAULT_MAX_INBOUND_STREAMS : DEFAULT_MAX_OUTBOUND_STREAMS;
Expand All @@ -145,12 +158,15 @@ export class WebRTCConnection implements ic.Connection {
}

async newStream(protocols: string | string[], options: AbortOptions = {}): Promise<ic.Stream> {
let label = genUuid().slice(0, 8);
let openPromise = defer();
let abortedPromise = defer();
let controller: TimeoutController | undefined;
let metrics = this.components.getMetrics();
if (this.closed) {
throw connectionClosedError(this.peerConnection.connectionState, 'cannot open new stream')
}
const label = genUuid().slice(0, 8);
const [openPromise, abortedPromise] = [defer(), defer()];
const metrics = this.components.getMetrics();

let openError: Error | undefined;
let controller: TimeoutController | undefined;

log.trace(`opening new stream with protocols: ${protocols}`);

Expand All @@ -168,7 +184,7 @@ export class WebRTCConnection implements ic.Connection {
};

log.trace(`[stream: ${label}] peerconnection state: ${this.peerConnection.connectionState}`);
let channel = this.peerConnection.createDataChannel(label);
const channel = this.peerConnection.createDataChannel(label);
channel.onopen = (_evt) => {
log.trace(`[stream: ${label}] data channel opened`);
openPromise.resolve();
Expand All @@ -188,7 +204,7 @@ export class WebRTCConnection implements ic.Connection {
throw openError;
}

let rawStream = new WebRTCStream({
const rawStream = new WebRTCStream({
channel,
stat: {
direction: 'outbound',
Expand All @@ -198,11 +214,11 @@ export class WebRTCConnection implements ic.Connection {
},
});

let { stream, protocol } = await msselect(rawStream, protocols, { signal: options.signal });
const { stream, protocol } = await msselect(rawStream, protocols, { signal: options.signal });
log.trace(`[stream ${label}] select protocol - ${protocol}`);
// check if stream is within limit after protocol has been negotiated
rawStream.stat.protocol = protocol;
let result = this.wrapMsStream(rawStream, stream);
const result = this.wrapMsStream(rawStream, stream);
// check if stream can be accomodated
if (metrics) {
metrics.trackStream({ stream, protocol, remotePeer: this.remotePeer });
Expand All @@ -213,10 +229,10 @@ export class WebRTCConnection implements ic.Connection {
}

addStream(stream: ic.Stream): void {
let protocol = stream.stat.protocol!;
let direction = stream.stat.direction;
const protocol = stream.stat.protocol!;
const direction = stream.stat.direction;
if (this.countStream(protocol, direction) === this.findStreamLimit(protocol, direction)) {
let err = overStreamLimit(direction, protocol);
const err = overStreamLimit(direction, protocol);
log(err.message);
stream.abort(err);
throw err;
Expand Down
14 changes: 13 additions & 1 deletion src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ export enum codes {
ERR_NOT_IMPLEMENTED = 'ERR_NOT_IMPLEMENTED',
ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS',
ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS',
ERR_CONNECTION_CLOSED = 'ERR_CONNECTION_CLOSED',
}

export class ConnectionClosedError extends WebRTCTransportError {
constructor(state: RTCPeerConnectionState, msg: string) {
super(`peerconnection moved to state: ${state}:` + msg);
this.name = 'WebRTC/ConnectionClosed';
}
}

export function connectionClosedError(state: RTCPeerConnectionState, msg: string) {
return createError(new ConnectionClosedError(state, msg), codes.ERR_CONNECTION_CLOSED)
}

export class InvalidArgumentError extends WebRTCTransportError {
Expand Down Expand Up @@ -83,7 +95,7 @@ export class DataChannelError extends WebRTCTransportError {
}

export function dataChannelError(streamLabel: string, msg: string) {
return createError(new OperationAbortedError(streamLabel, msg), codes.ERR_DATA_CHANNEL);
return createError(new DataChannelError(streamLabel, msg), codes.ERR_DATA_CHANNEL);
}

export class StreamingLimitationError extends WebRTCTransportError {
Expand Down
2 changes: 1 addition & 1 deletion src/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ message Message {
optional Flag flag = 1;

optional bytes message = 2;
}
}
3 changes: 1 addition & 2 deletions src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ export interface WebRTCListenerOptions extends CreateListenerOptions {
// channelOptions?: WebRTCReceiverInit
}

export interface WebRTCDialOptions extends DialOptions {
}
export interface WebRTCDialOptions extends DialOptions {}
29 changes: 14 additions & 15 deletions src/sdp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { bases } from 'multiformats/basics';

const log = logger('libp2p:webrtc:sdp');

const mbdecoder = (function () {
export const mbdecoder = (function () {
const decoders = Object.values(bases).map((b) => b.decoder);
let acc = decoders[0].or(decoders[1]);
decoders.slice(2).forEach((d) => (acc = acc.or(d)));
Expand All @@ -16,7 +16,7 @@ const mbdecoder = (function () {
const CERTHASH_CODE: number = 466;

function ipv(ma: Multiaddr): string {
for (let proto of ma.protoNames()) {
for (const proto of ma.protoNames()) {
if (proto.startsWith('ip')) {
return proto.toUpperCase();
}
Expand All @@ -32,20 +32,20 @@ function port(ma: Multiaddr): number {
}

export function certhash(ma: Multiaddr): string {
let tups = ma.stringTuples();
let certhash_value = tups.filter((tup) => tup[0] == CERTHASH_CODE).map((tup) => tup[1])[0];
const tups = ma.stringTuples();
const certhash_value = tups.filter((tup) => tup[0] == CERTHASH_CODE).map((tup) => tup[1])[0];
if (certhash_value) {
return certhash_value;
} else {
throw inappropriateMultiaddr("Couldn't find a certhash component of multiaddr:" + ma.toString());
}
}

function certhashToFingerprint(ma: Multiaddr): string {
let certhash_value = certhash(ma);
export function certhashToFingerprint(ma: Multiaddr): string[] {
const certhash_value = certhash(ma);
// certhash_value is a multibase encoded multihash encoded string
let mbdecoded = mbdecoder.decode(certhash_value);
let mhdecoded = multihashes.decode(mbdecoded);
const mbdecoded = mbdecoder.decode(certhash_value);
const mhdecoded = multihashes.decode(mbdecoded);
let prefix = '';
switch (mhdecoded.name) {
case 'md5':
Expand All @@ -61,17 +61,17 @@ function certhashToFingerprint(ma: Multiaddr): string {
throw unsupportedHashAlgorithm(mhdecoded.name);
}

let fp = mhdecoded.digest.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), '');
fp = fp.match(/.{1,2}/g)!.join(':');
const fp = mhdecoded.digest.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), '');
const fpSdp = fp.match(/.{1,2}/g)!.join(':');

return `${prefix} ${fp}`;
return [`${prefix.toUpperCase()} ${fpSdp.toUpperCase()}`, fp];
}

function ma2sdp(ma: Multiaddr, ufrag: string): string {
const IP = ip(ma);
const IPVERSION = ipv(ma);
const PORT = port(ma);
const CERTFP = certhashToFingerprint(ma);
const [CERTFP, _] = certhashToFingerprint(ma);
return `v=0
o=- 0 0 IN ${IPVERSION} ${IP}
s=-
Expand All @@ -80,14 +80,13 @@ t=0 0
a=ice-lite
m=application ${PORT} UDP/DTLS/SCTP webrtc-datachannel
a=mid:0
a=setup:active
a=ice-options:ice2
a=setup:passive
a=ice-ufrag:${ufrag}
a=ice-pwd:${ufrag}
a=fingerprint:${CERTFP}
a=sctp-port:5000
a=max-message-size:100000
a=candidate:1 1 UDP 1 ${IP} ${PORT} typ host`;
a=candidate:1467250027 1 UDP 1467250027 ${IP} ${PORT} typ host\r\n`;
}

export function fromMultiAddr(ma: Multiaddr, ufrag: string): RTCSessionDescriptionInit {
Expand Down
Loading

0 comments on commit 6824cfd

Please sign in to comment.