|
| 1 | +import {Reader} from '@jsonjoy.com/buffers/lib/Reader'; |
| 2 | +import {Nfsv4Decoder} from './Nfsv4Decoder'; |
| 3 | +import {FullNfsv4Encoder} from './FullNfsv4Encoder'; |
| 4 | +import {RmRecordDecoder, RmRecordEncoder} from '../../rm'; |
| 5 | +import {RpcAcceptStat, RpcAuthFlavor, RpcCallMessage, RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcOpaqueAuth} from '../../rpc'; |
| 6 | +import {EMPTY_READER, Nfsv4Proc} from './constants'; |
| 7 | +import {Nfsv4CompoundRequest} from './messages'; |
| 8 | +import {getOpNameFromRequest} from './util'; |
| 9 | +import type {Duplex} from 'node:stream'; |
| 10 | +import type {IWriter, IWriterGrowable} from '@jsonjoy.com/buffers/lib/types'; |
| 11 | + |
| 12 | +export interface Nfsv4ConnectionOpts { |
| 13 | + /** |
| 14 | + * Normally this is a TCP socket, but any Duplex stream will do. |
| 15 | + */ |
| 16 | + duplex: Duplex; |
| 17 | + encoder?: FullNfsv4Encoder; |
| 18 | + decoder?: Nfsv4Decoder; |
| 19 | + debug?: boolean; |
| 20 | + logger?: Pick<typeof console, 'log' | 'error'>; |
| 21 | +} |
| 22 | + |
| 23 | +export class Nfsv4Connection { |
| 24 | + public closed = false; |
| 25 | + public maxIncomingMessage: number = 2 * 1024 * 1024; |
| 26 | + public maxBackpressure: number = 2 * 1024 * 1024; |
| 27 | + |
| 28 | + /** Last known RPC transaction ID. Used to emit fatal connection errors. */ |
| 29 | + protected lastXid = 0; |
| 30 | + |
| 31 | + public readonly duplex: Duplex; |
| 32 | + |
| 33 | + protected readonly rmDecoder: RmRecordDecoder; |
| 34 | + protected readonly rpcDecoder: RpcMessageDecoder; |
| 35 | + protected readonly nfsDecoder: Nfsv4Decoder; |
| 36 | + protected readonly writer: IWriter & IWriterGrowable; |
| 37 | + protected readonly rmEncoder: RmRecordEncoder; |
| 38 | + protected readonly rpcEncoder: RpcMessageEncoder; |
| 39 | + protected readonly nfsEncoder: FullNfsv4Encoder; |
| 40 | + |
| 41 | + public debug: boolean; |
| 42 | + public logger: Pick<typeof console, 'log' | 'error'>; |
| 43 | + |
| 44 | + constructor(opts: Nfsv4ConnectionOpts) { |
| 45 | + this.debug = !!opts.debug; |
| 46 | + this.logger = opts.logger || console; |
| 47 | + const duplex = this.duplex = opts.duplex; |
| 48 | + this.rmDecoder = new RmRecordDecoder(); |
| 49 | + this.rpcDecoder = new RpcMessageDecoder(); |
| 50 | + this.nfsDecoder = new Nfsv4Decoder(); |
| 51 | + const nfsEncoder = this.nfsEncoder = new FullNfsv4Encoder(); |
| 52 | + this.writer = nfsEncoder.writer; |
| 53 | + this.rmEncoder = nfsEncoder.rmEncoder; |
| 54 | + this.rpcEncoder = nfsEncoder.rpcEncoder; |
| 55 | + duplex.on('data', this.onData.bind(this)); |
| 56 | + duplex.on('timeout', () => this.close()); |
| 57 | + duplex.on('close', (hadError: boolean): void => { |
| 58 | + this.close(); |
| 59 | + }); |
| 60 | + duplex.on('error', (err: Error) => { |
| 61 | + this.logger.error('SOCKET ERROR:', err); |
| 62 | + this.close(); |
| 63 | + }); |
| 64 | + } |
| 65 | + |
| 66 | + protected onData(data: Uint8Array): void { |
| 67 | + const {rmDecoder, rpcDecoder} = this; |
| 68 | + rmDecoder.push(data); |
| 69 | + let record = rmDecoder.readRecord(); |
| 70 | + while (record) { |
| 71 | + if (record.size()) { |
| 72 | + const rpcMessage = rpcDecoder.decodeMessage(record); |
| 73 | + if (rpcMessage) this.onRpcMessage(rpcMessage); |
| 74 | + else { |
| 75 | + this.close(); |
| 76 | + return; |
| 77 | + } |
| 78 | + } |
| 79 | + record = rmDecoder.readRecord(); |
| 80 | + } |
| 81 | + } |
| 82 | + |
| 83 | + protected onRpcMessage(msg: RpcMessage): void { |
| 84 | + const debug = this.debug; |
| 85 | + if (msg instanceof RpcCallMessage) { |
| 86 | + const proc = msg.proc; |
| 87 | + switch (proc) { |
| 88 | + case Nfsv4Proc.NULL: { |
| 89 | + if (debug) this.logger.log('NULL procedure'); |
| 90 | + const rmEncoder = this.rmEncoder; |
| 91 | + const state = rmEncoder.startRmRecord(); |
| 92 | + this.rpcEncoder.writeAcceptedReply( |
| 93 | + msg.xid, |
| 94 | + new RpcOpaqueAuth(RpcAuthFlavor.AUTH_NONE, EMPTY_READER), |
| 95 | + RpcAcceptStat.SUCCESS, |
| 96 | + ); |
| 97 | + rmEncoder.endRmRecord(state); |
| 98 | + this.write(this.writer.flush()); |
| 99 | + return; |
| 100 | + } |
| 101 | + case Nfsv4Proc.COMPOUND: { |
| 102 | + if (!(msg.params instanceof Reader)) return; |
| 103 | + const compound = this.nfsDecoder.decodeCompoundRequest(msg.params); |
| 104 | + if (compound instanceof Nfsv4CompoundRequest) { |
| 105 | + console.log('\nNFS COMPOUND Request:'); |
| 106 | + console.log(` Tag: "${compound.tag}"`); |
| 107 | + console.log(` Minor Version: ${compound.minorversion}`); |
| 108 | + console.log(` Operations (${compound.argarray.length}):`); |
| 109 | + compound.argarray.forEach((op: any, idx: number) => { |
| 110 | + console.log(` [${idx}] ${getOpNameFromRequest(op)}`); |
| 111 | + console.log(` ${JSON.stringify(op, null, 2).split('\n').slice(1).join('\n ')}`); |
| 112 | + }); |
| 113 | + } else { |
| 114 | + console.log('Could not decode COMPOUND request'); |
| 115 | + } |
| 116 | + return; |
| 117 | + } |
| 118 | + default: { |
| 119 | + console.log(`Unknown procedure: ${proc}`); |
| 120 | + } |
| 121 | + } |
| 122 | + } |
| 123 | + throw new Error('Not implemented non-RPCCallMessage'); |
| 124 | + } |
| 125 | + |
| 126 | + private closeWithError(error: RpcAcceptStat.PROG_UNAVAIL | RpcAcceptStat.PROC_UNAVAIL | RpcAcceptStat.GARBAGE_ARGS | RpcAcceptStat.SYSTEM_ERR): void { |
| 127 | + const xid = this.lastXid; |
| 128 | + if (xid) { |
| 129 | + const state = this.rmEncoder.startRmRecord(); |
| 130 | + const verify = new RpcOpaqueAuth(RpcAuthFlavor.AUTH_NONE, EMPTY_READER); |
| 131 | + this.rpcEncoder.writeAcceptedReply(xid, verify, error); |
| 132 | + this.rmEncoder.endRmRecord(state); |
| 133 | + const bin = this.writer.flush(); |
| 134 | + this.duplex.write(bin); |
| 135 | + } |
| 136 | + this.close(); |
| 137 | + } |
| 138 | + |
| 139 | + private close(): void { |
| 140 | + if (this.closed) return; |
| 141 | + this.closed = true; |
| 142 | + clearImmediate(this.__uncorkTimer); |
| 143 | + this.__uncorkTimer = null; |
| 144 | + const duplex = this.duplex; |
| 145 | + duplex.removeAllListeners(); |
| 146 | + if (!duplex.destroyed) duplex.destroy(); |
| 147 | + } |
| 148 | + |
| 149 | + // ---------------------------------------------------------- Write to socket |
| 150 | + |
| 151 | + private __uncorkTimer: any = null; |
| 152 | + |
| 153 | + public write(buf: Uint8Array): void { |
| 154 | + if (this.closed) return; |
| 155 | + const duplex = this.duplex; |
| 156 | + if (duplex.writableLength > this.maxBackpressure) { |
| 157 | + this.closeWithError(RpcAcceptStat.SYSTEM_ERR); |
| 158 | + return; |
| 159 | + } |
| 160 | + const __uncorkTimer = this.__uncorkTimer; |
| 161 | + if (!__uncorkTimer) duplex.cork(); |
| 162 | + duplex.write(buf); |
| 163 | + if (!__uncorkTimer) this.__uncorkTimer = setImmediate(() => { |
| 164 | + this.__uncorkTimer = null; |
| 165 | + duplex.uncork(); |
| 166 | + }); |
| 167 | + } |
| 168 | + |
| 169 | + // ------------------------------------------------- Write WebSocket messages |
| 170 | + |
| 171 | + // TODO: Execute NFS Callback... |
| 172 | + public send(): void { |
| 173 | + |
| 174 | + } |
| 175 | +} |
0 commit comments