diff --git a/.aegir.js b/.aegir.js new file mode 100644 index 0000000..4e96ec1 --- /dev/null +++ b/.aegir.js @@ -0,0 +1,7 @@ +module.exports = { + build: { + config: { + platform: 'node' + } + } +} diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index d437a13..7cefb83 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,8 +13,9 @@ jobs: steps: - uses: actions/checkout@v2 - run: npm install - - run: npx aegir lint + - run: npm run lint - run: npx aegir dep-check -- -i wrtc -i electron-webrtc + - run: npm run build test-node: needs: check runs-on: ${{ matrix.os }} diff --git a/.gitignore b/.gitignore index 9a5d409..2df8b47 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ package-lock.json coverage .nyc_output docs + +dist diff --git a/package.json b/package.json index c95a527..e2de256 100644 --- a/package.json +++ b/package.json @@ -6,11 +6,12 @@ "main": "src/index.js", "scripts": { "lint": "aegir lint", + "build": "aegir build", "test": "aegir test -t node", "test:node": "aegir test -t node", - "release": "aegir release -t node --no-build", - "release-minor": "aegir release -t node --type minor --no-build", - "release-major": "aegir-release -t node --type major --no-build", + "release": "aegir release -t node", + "release-minor": "aegir release -t node --type minor", + "release-major": "aegir-release -t node --type major", "coverage": "nyc --reporter=text --reporter=lcov npm run test:node" }, "pre-push": [ @@ -37,10 +38,12 @@ "engines": { "node": ">=14.0.0" }, + "types": "dist/src/index.d.ts", "devDependencies": { - "aegir": "^33.0.0", + "@types/debug": "^4.1.5", + "aegir": "^33.2.0", "it-pipe": "^1.1.0", - "libp2p-interfaces": "^0.9.0", + "libp2p-interfaces": "^0.11.0", "sinon": "^10.0.1", "streaming-iterables": "^5.0.2" }, diff --git a/src/index.js b/src/index.js index 5a1f069..e840f2c 100644 --- a/src/index.js +++ b/src/index.js @@ -2,6 +2,8 @@ const net = require('net') const mafmt = require('mafmt') +// Missing Type +// @ts-ignore const withIs = require('class-is') const errCode = require('err-code') const log = require('debug')('libp2p:tcp') @@ -14,6 +16,9 @@ const { CODE_CIRCUIT, CODE_P2P } = require('./constants') /** * @typedef {import('multiaddr').Multiaddr} Multiaddr * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('libp2p-interfaces/src/transport/types').Upgrader} Upgrader + * @typedef {import('libp2p-interfaces/src/transport/types').Listener} Listener + * @typedef {import('net').Socket} Socket */ class TCP { @@ -33,8 +38,8 @@ class TCP { * @async * @param {Multiaddr} ma * @param {object} options - * @param {AbortSignal} options.signal - Used to abort dial requests - * @returns {Connection} An upgraded Connection + * @param {AbortSignal} [options.signal] - Used to abort dial requests + * @returns {Promise} An upgraded Connection */ async dial (ma, options) { options = options || {} @@ -50,7 +55,7 @@ class TCP { * @private * @param {Multiaddr} ma * @param {object} options - * @param {AbortSignal} options.signal - Used to abort dial requests + * @param {AbortSignal} [options.signal] - Used to abort dial requests * @returns {Promise} Resolves a TCP Socket */ _connect (ma, options = {}) { @@ -65,13 +70,13 @@ class TCP { log('dialing %j', cOpts) const rawSocket = net.connect(cOpts) - const onError = err => { + const onError = /** @param {Error} err */ err => { err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}` done(err) } const onTimeout = () => { - log('connnection timeout %s:%s', cOpts.host, cOpts.port) + log('connection timeout %s:%s', cOpts.host, cOpts.port) const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT') // Note: this will result in onError() being called rawSocket.emit('error', err) @@ -88,7 +93,7 @@ class TCP { done(new AbortError()) } - const done = err => { + const done = /** @param {Error} [err] */ err => { rawSocket.removeListener('error', onError) rawSocket.removeListener('timeout', onTimeout) rawSocket.removeListener('connect', onConnect) @@ -110,17 +115,21 @@ class TCP { * anytime a new incoming Connection has been successfully upgraded via * `upgrader.upgradeInbound`. * - * @param {*} [options] - * @param {function(Connection)} handler + * @param {* | function(Connection):void} options + * @param {function(Connection):void} [handler] * @returns {Listener} A TCP listener */ createListener (options, handler) { + let listenerHandler + if (typeof options === 'function') { - handler = options + listenerHandler = options options = {} + } else { + listenerHandler = handler } options = options || {} - return createListener({ handler, upgrader: this._upgrader }, options) + return createListener({ handler: listenerHandler, upgrader: this._upgrader }, options) } /** diff --git a/src/listener.js b/src/listener.js index a7297ba..30a3c28 100644 --- a/src/listener.js +++ b/src/listener.js @@ -3,9 +3,9 @@ const net = require('net') const EventEmitter = require('events') const debug = require('debug') -const log = debug('libp2p:tcp:listener') -log.error = debug('libp2p:tcp:listener:error') - +const log = Object.assign( + debug('libp2p:tcp:listener'), + { error: debug('libp2p:tcp:listener:error') }) const toConnection = require('./socket-to-conn') const { CODE_P2P } = require('./constants') const { @@ -13,11 +13,20 @@ const { multiaddrToNetConfig } = require('./utils') +/** + * @typedef {import('multiaddr').Multiaddr} Multiaddr + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + * @typedef {import('libp2p-interfaces/src/transport/types').Upgrader} Upgrader + * @typedef {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} MultiaddrConnection + * @typedef {import('libp2p-interfaces/src/transport/types').Listener} Listener + * @typedef {import('net').Server & {__connections: MultiaddrConnection[]}} Server + */ + /** * Attempts to close the given maConn. If a failure occurs, it will be logged. * * @private - * @param {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} maConn + * @param {MultiaddrConnection} maConn */ async function attemptClose (maConn) { try { @@ -27,13 +36,80 @@ async function attemptClose (maConn) { } } +/** + * Create listener + * + * @param {object} context + * @param {function(Connection):void} context.handler + * @param {Upgrader} context.upgrader + * @param {*} options + * @returns {Listener} + */ module.exports = ({ handler, upgrader }, options) => { - const listener = new EventEmitter() + /** @type {Server} */ + // eslint-disable-next-line prefer-const + let server + + /** @type {string | null} */ + let peerId + + /** @type {Multiaddr} */ + let listeningAddr + + const listener = Object.assign(new EventEmitter(), { + getAddrs: () => { + /** @type {Multiaddr[]} */ + let addrs = [] + /** @type {import('net').AddressInfo} */ + // @ts-ignore + const address = server.address() + + if (!address) { + throw new Error('Listener is not ready yet') + } + + // Because TCP will only return the IPv6 version + // we need to capture from the passed multiaddr + if (listeningAddr.toString().startsWith('/ip4')) { + addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port)) + } else if (address.family === 'IPv6') { + addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port)) + } + + return addrs.map(ma => peerId ? ma.encapsulate(`/p2p/${peerId}`) : ma) + }, + listen: async (/** @type {Multiaddr} */ ma) => { + listeningAddr = ma + peerId = ma.getPeerId() + + if (peerId) { + listeningAddr = ma.decapsulateCode(CODE_P2P) + } + + return new Promise((resolve, reject) => { + const options = multiaddrToNetConfig(listeningAddr) + server.listen(options, (/** @type {any} */ err) => { + if (err) return reject(err) + log('Listening on %s', server.address()) + resolve(undefined) + }) + }) + }, + close: async () => { + if (!server.listening) return + + return new Promise((resolve, reject) => { + server.__connections.forEach(maConn => attemptClose(maConn)) + server.close(err => err ? reject(err) : resolve(undefined)) + }) + } + }) - const server = net.createServer(async socket => { + server = Object.assign(net.createServer(async socket => { // Avoid uncaught errors caused by unstable connections socket.on('error', err => log('socket error', err)) + /** @type {MultiaddrConnection} */ let maConn let conn try { @@ -42,6 +118,7 @@ module.exports = ({ handler, upgrader }, options) => { conn = await upgrader.upgradeInbound(maConn) } catch (err) { log.error('inbound connection failed', err) + // @ts-ignore return attemptClose(maConn) } @@ -51,67 +128,22 @@ module.exports = ({ handler, upgrader }, options) => { if (handler) handler(conn) listener.emit('connection', conn) - }) + }), + // Keep track of open connections to destroy in case of timeout + { __connections: [] }) server .on('listening', () => listener.emit('listening')) .on('error', err => listener.emit('error', err)) .on('close', () => listener.emit('close')) - // Keep track of open connections to destroy in case of timeout - server.__connections = [] - - listener.close = () => { - if (!server.listening) return - - return new Promise((resolve, reject) => { - server.__connections.forEach(maConn => attemptClose(maConn)) - server.close(err => err ? reject(err) : resolve()) - }) - } - - let peerId, listeningAddr - - listener.listen = ma => { - listeningAddr = ma - peerId = ma.getPeerId() - - if (peerId) { - listeningAddr = ma.decapsulateCode(CODE_P2P) - } - - return new Promise((resolve, reject) => { - const options = multiaddrToNetConfig(listeningAddr) - server.listen(options, err => { - if (err) return reject(err) - log('Listening on %s', server.address()) - resolve() - }) - }) - } - - listener.getAddrs = () => { - let addrs = [] - const address = server.address() - - if (!address) { - throw new Error('Listener is not ready yet') - } - - // Because TCP will only return the IPv6 version - // we need to capture from the passed multiaddr - if (listeningAddr.toString().startsWith('/ip4')) { - addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port)) - } else if (address.family === 'IPv6') { - addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port)) - } - - return addrs.map(ma => peerId ? ma.encapsulate(`/p2p/${peerId}`) : ma) - } - return listener } +/** + * @param {Server} server + * @param {MultiaddrConnection} maConn + */ function trackConn (server, maConn) { server.__connections.push(maConn) @@ -119,5 +151,6 @@ function trackConn (server, maConn) { server.__connections = server.__connections.filter(c => c !== maConn) } + // @ts-ignore maConn.conn.once('close', untrackConn) } diff --git a/src/socket-to-conn.js b/src/socket-to-conn.js index 2986749..9fd4a68 100644 --- a/src/socket-to-conn.js +++ b/src/socket-to-conn.js @@ -2,13 +2,32 @@ const abortable = require('abortable-iterator') const log = require('debug')('libp2p:tcp:socket') +// Missing Type +// @ts-ignore const toIterable = require('stream-to-it') const toMultiaddr = require('libp2p-utils/src/ip-port-to-multiaddr') const { CLOSE_TIMEOUT } = require('./constants') -// Convert a socket into a MultiaddrConnection -// https://github.com/libp2p/interface-transport#multiaddrconnection -module.exports = (socket, options) => { +/** + * @typedef {import('multiaddr').Multiaddr} Multiaddr + * @typedef {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} MultiaddrConnection + * @typedef {import('net').Socket} Socket + */ + +/** + * Convert a socket into a MultiaddrConnection + * https://github.com/libp2p/interface-transport#multiaddrconnection + * + * @private + * @param {Socket} socket + * @param {object} options + * @param {Multiaddr} [options.listeningAddr] + * @param {Multiaddr} [options.remoteAddr] + * @param {Multiaddr} [options.localAddr] + * @param {AbortSignal} [options.signal] + * @returns {MultiaddrConnection} + */ +const toConnection = (socket, options) => { options = options || {} // Check if we are connected on a unix path @@ -21,9 +40,13 @@ module.exports = (socket, options) => { } const { sink, source } = toIterable.duplex(socket) + + /** @type {MultiaddrConnection} */ const maConn = { async sink (source) { if (options.signal) { + // Missing Type for "abortable" + // @ts-ignore source = abortable(source, options.signal) } @@ -31,6 +54,8 @@ module.exports = (socket, options) => { await sink((async function * () { for await (const chunk of source) { // Convert BufferList to Buffer + // Sink in StreamMuxer define argument as Uint8Array so chunk type infers as number which can't be sliced + // @ts-ignore yield Buffer.isBuffer(chunk) ? chunk : chunk.slice() } })()) @@ -45,6 +70,8 @@ module.exports = (socket, options) => { } }, + // Missing Type for "abortable" + // @ts-ignore source: options.signal ? abortable(source, options.signal) : source, conn: socket, @@ -52,11 +79,11 @@ module.exports = (socket, options) => { localAddr: options.localAddr || toMultiaddr(socket.localAddress, socket.localPort), // If the remote address was passed, use it - it may have the peer ID encapsulated - remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress, socket.remotePort), + remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress || '', socket.remotePort || ''), timeline: { open: Date.now() }, - close () { + async close () { if (socket.destroyed) return return new Promise((resolve, reject) => { @@ -66,8 +93,12 @@ module.exports = (socket, options) => { // timeout, destroy it manually. const timeout = setTimeout(() => { const { host, port } = maConn.remoteAddr.toOptions() - log('timeout closing socket to %s:%s after %dms, destroying it manually', - host, port, Date.now() - start) + log( + 'timeout closing socket to %s:%s after %dms, destroying it manually', + host, + port, + Date.now() - start + ) if (socket.destroyed) { log('%s:%s is already destroyed', host, port) @@ -82,7 +113,7 @@ module.exports = (socket, options) => { clearTimeout(timeout) resolve() }) - socket.end(err => { + socket.end(/** @param {Error} [err] */(err) => { maConn.timeline.close = Date.now() if (err) return reject(err) resolve() @@ -102,3 +133,5 @@ module.exports = (socket, options) => { return maConn } + +module.exports = toConnection diff --git a/src/utils.js b/src/utils.js index abb4394..1ba1ef9 100644 --- a/src/utils.js +++ b/src/utils.js @@ -5,21 +5,41 @@ const os = require('os') const { resolve } = require('path') const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' } +/** + * @typedef {import('multiaddr').MultiaddrObject} MultiaddrObject + */ + +/** + * @param {Multiaddr} addr + * @returns {MultiaddrObject} + */ function multiaddrToNetConfig (addr) { const listenPath = addr.getPath() // unix socket listening if (listenPath) { + // TCP should not return unix socket else need to refactor listener which accepts connection options object + // @ts-ignore return resolve(listenPath) } // tcp listening return addr.toOptions() } +/** + * @param {'ip4' | 'ip6'} proto + * @param {string} ip + * @param {number} port + * @returns {Multiaddr[]} + */ function getMultiaddrs (proto, ip, port) { - const toMa = ip => new Multiaddr(`/${proto}/${ip}/tcp/${port}`) + const toMa = /** @param {string} ip */ ip => new Multiaddr(`/${proto}/${ip}/tcp/${port}`) return (isAnyAddr(ip) ? getNetworkAddrs(ProtoFamily[proto]) : [ip]).map(toMa) } +/** + * @param {string} ip + * @returns {boolean} + */ function isAnyAddr (ip) { return ['0.0.0.0', '::'].includes(ip) } @@ -30,14 +50,25 @@ function isAnyAddr (ip) { * @returns {string[]} an array of ip address strings */ const networks = os.networkInterfaces() + +/** + * @param {string} family + * @returns {string[]} + */ function getNetworkAddrs (family) { - return Object.values(networks).reduce((addresses, netAddrs) => { - netAddrs.forEach(netAddr => { - // Add the ip of each matching network interface - if (netAddr.family === family) addresses.push(netAddr.address) - }) - return addresses - }, []) + const addresses = [] + + for (const [, netAddrs] of Object.entries(networks)) { + if (netAddrs) { + for (const netAddr of netAddrs) { + if (netAddr.family === family) { + addresses.push(netAddr.address) + } + } + } + } + + return addresses } module.exports = { diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..51187f1 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,14 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist", + "baseUrl": "./", + "paths": { + "*": ["./types/*"] + } + }, + "include": [ + "types", + "src" + ] +}