Skip to content

Commit

Permalink
add ability to read file by chunk when server limlits the read buffer…
Browse files Browse the repository at this point in the history
… size
  • Loading branch information
erossignon committed Mar 12, 2023
1 parent 00be152 commit 3e1824f
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 20 deletions.
1 change: 1 addition & 0 deletions packages/node-opcua-file-transfer/package.json
Expand Up @@ -14,6 +14,7 @@
"node-opcua-address-space": "2.91.1",
"node-opcua-assert": "2.88.0",
"node-opcua-basic-types": "2.90.1",
"node-opcua-binary-stream": "2.90.1",
"node-opcua-constants": "2.88.0",
"node-opcua-data-model": "2.90.1",
"node-opcua-debug": "2.90.1",
Expand Down
52 changes: 47 additions & 5 deletions packages/node-opcua-file-transfer/source/client/client_file.ts
Expand Up @@ -20,11 +20,36 @@ const doDebug = checkDebugFlag("FileType");
import { OpenFileMode } from "../open_mode";
export { OpenFileMode } from "../open_mode";


export interface IClientFile {
fileHandle: number;
open(mode: OpenFileMode): Promise<number>;
close(): Promise<void>;
getPosition(): Promise<UInt64>;
setPosition(position: UInt64 | UInt32): Promise<void>;
read(bytesToRead: UInt32 | Int32 | Int64 | UInt64): Promise<Buffer>;
write(data: Buffer): Promise<void>;
openCount(): Promise<UInt16>;
size(): Promise<UInt64>;
}
export interface IClientFilePriv extends IClientFile {
readonly fileNodeId: NodeId;
openMethodNodeId?: NodeId;
closeMethodNodeId?: NodeId;
setPositionNodeId?: NodeId;
getPositionNodeId?: NodeId;
readNodeId?: NodeId;
writeNodeId?: NodeId;
openCountNodeId?: NodeId;
sizeNodeId?: NodeId;
ensureInitialized(): Promise<void>;
}

/**
*
*
*/
export class ClientFile {
export class ClientFile implements IClientFile {
public static useGlobalMethod = false;

public fileHandle = 0;
Expand Down Expand Up @@ -281,18 +306,35 @@ export class ClientFile {
}
}

export async function readFile(clientFile: ClientFile): Promise<Buffer> {

export async function readFile(clientFile: IClientFile): Promise<Buffer> {
await clientFile.open(OpenFileMode.Read);
try {
const fileSize = await clientFile.size();
const fileSize = coerceInt32(await clientFile.size());
/**
* Read file
*/
const data = await clientFile.read(fileSize);
return data;
if (data.length >= fileSize) {
// everything has been read
return data;
}

// wee need to loop to complete the read
const chunks = [data];
let remaining = fileSize - data.length;
while (remaining > 0) {
const buf = await clientFile.read(remaining);
chunks.push(buf);
remaining -= buf.length;
}
return Buffer.concat(chunks);
} finally {
await clientFile.close();
}
}

export async function readOPCUAFile(clientFile: ClientFile): Promise<Buffer> {
export async function readOPCUAFile(clientFile: IClientFile): Promise<Buffer> {
return await readFile(clientFile);
}

Expand Down
Expand Up @@ -17,6 +17,7 @@ import {
UAObjectType
} from "node-opcua-address-space";
import { Byte, Int32, UInt32, UInt64 } from "node-opcua-basic-types";
import { BinaryStream } from "node-opcua-binary-stream";
import { checkDebugFlag, make_debugLog, make_errorLog, make_warningLog } from "node-opcua-debug";
import { NodeId, NodeIdLike, sameNodeId } from "node-opcua-nodeid";
import { CallMethodResultOptions } from "node-opcua-service-call";
Expand Down Expand Up @@ -93,10 +94,17 @@ export interface FileOptions {

nodeId?: NodeIdLike;

/**
* the maximum number of bytes that can be read from the file
* in a single read call
* - if not specified or 0, we assume Int32 limit
*/
maxChunkSize?: number;

refreshFileContentFunc?: () => Promise<void>;
}

export interface UAFileType extends UAObjectType, UAFile_Base {}
export interface UAFileType extends UAObjectType, UAFile_Base { }
/**
*
*/
Expand All @@ -105,11 +113,14 @@ export class FileTypeData {
public filename = "";
public maxSize = 0;
public mimeType = "";
public maxChunkSizeBytes = 0;

private file: UAFile;
private _openCount = 0;
private _fileSize = 0;

public static maxChunkSize = 16 * 1024 * 1024; // 16 MB

public refreshFileContentFunc?: () => Promise<void>;

constructor(options: FileOptions, file: UAFile) {
Expand All @@ -120,6 +131,8 @@ export class FileTypeData {
this.filename = options.filename;
this.maxSize = options.maxSize!;
this.mimeType = options.mimeType || "";
this.maxChunkSizeBytes = options.maxChunkSize || FileTypeData.maxChunkSize;

// openCount indicates the number of currently valid file handles on the file.
this._openCount = 0;
file.openCount.bindVariable(
Expand Down Expand Up @@ -234,6 +247,10 @@ export function getFileData(opcuaFile2: UAFile): FileTypeData {
return (opcuaFile2 as UAFileEx).$fileData;
}

function getFileDataFromContext(context: ISessionContext): FileTypeData {
return getFileData(context.object as UAFile);
}

interface FileAccessData {
handle: number;
fd: number; // nodejs handler
Expand All @@ -248,7 +265,7 @@ interface FileTypeM {
$$files: { [key: number]: FileAccessData };
}

interface AddressSpacePriv extends IAddressSpace, FileTypeM {}
interface AddressSpacePriv extends IAddressSpace, FileTypeM { }
function _prepare(addressSpace: IAddressSpace, context: ISessionContext): FileTypeM {
const _context = addressSpace as AddressSpacePriv;
_context.$$currentFileHandle = _context.$$currentFileHandle ? _context.$$currentFileHandle : 41;
Expand Down Expand Up @@ -391,7 +408,7 @@ async function _openFile(this: UAMethod, inputArguments: Variant[], context: ISe
return { statusCode: StatusCodes.BadInvalidArgument };
}

const fileData = (context.object as UAFileEx).$fileData;
const fileData = getFileDataFromContext(context);

const filename = fileData.filename;

Expand Down Expand Up @@ -443,7 +460,7 @@ async function _openFile(this: UAMethod, inputArguments: Variant[], context: ISe
}

function _getFileSystem(context: ISessionContext): AbstractFs {
const fs: AbstractFs = (context.object as UAFileEx).$fileData._fs;
const fs: AbstractFs = getFileDataFromContext(context)._fs;
return fs;
}

Expand All @@ -467,13 +484,13 @@ async function _closeFile(this: UAMethod, inputArguments: Variant[], context: IS
return { statusCode: StatusCodes.BadInvalidArgument };
}

const data = (context.object as UAFileEx).$fileData as FileTypeData;

debugLog("Closing file handle ", fileHandle, "filename: ", data.filename, "openCount: ", data.openCount);
const fileData = getFileDataFromContext(context);
debugLog("Closing file handle ", fileHandle, "filename: ", fileData.filename, "openCount: ", fileData.openCount);

await promisify(abstractFs.close)(_fileInfo.fd);
_close(addressSpace, context, _fileInfo);
data.openCount -= 1;
fileData.openCount -= 1;

return {
statusCode: StatusCodes.Good
Expand Down Expand Up @@ -507,6 +524,7 @@ async function _readFile(this: UAMethod, inputArguments: Variant[], context: ISe
return { statusCode: StatusCodes.BadInvalidArgument };
}


const _fileInfo = _getFileInfo(addressSpace, context, fileHandle);
if (!_fileInfo) {
return { statusCode: StatusCodes.BadInvalidState };
Expand All @@ -517,6 +535,19 @@ async function _readFile(this: UAMethod, inputArguments: Variant[], context: ISe
return { statusCode: StatusCodes.BadInvalidState };
}

// length cannot exceed maxChunkSizeBytes
const fileData = getFileDataFromContext(context);

const maxChunkSizeBytes = fileData.maxChunkSizeBytes;
if (length > maxChunkSizeBytes) {
length = maxChunkSizeBytes;
}
// length cannot either exceed ByteStream.maxChunkSizeBytes
if (length > BinaryStream.maxByteStringLength) {
length = BinaryStream.maxByteStringLength;
}

// length cannot either exceed remaining buffer size from current position
length = Math.min(_fileInfo.size - _fileInfo.position[1], length);

const data = Buffer.alloc(length);
Expand Down Expand Up @@ -584,10 +615,10 @@ async function _writeFile(this: UAMethod, inputArguments: Variant[], context: IS
_fileInfo.position[1] += ret.bytesWritten;
_fileInfo.size = Math.max(_fileInfo.size, _fileInfo.position[1]);

const fileTypeData = (context.object as UAFileEx).$fileData as FileTypeData;
debugLog(fileTypeData.fileSize);
fileTypeData.fileSize = Math.max(fileTypeData.fileSize, _fileInfo.position[1]);
debugLog(fileTypeData.fileSize);
const fileData = getFileDataFromContext(context);
debugLog(fileData.fileSize);
fileData.fileSize = Math.max(fileData.fileSize, _fileInfo.position[1]);
debugLog(fileData.fileSize);
} catch (err) {
if (err instanceof Error) {
errorLog("Write error : ", err.message);
Expand Down
76 changes: 73 additions & 3 deletions packages/node-opcua-file-transfer/test/test_file_transfer.ts
Expand Up @@ -4,6 +4,8 @@ import * as fsOrigin from "fs";
import * as os from "os";
import * as path from "path";
import { promisify } from "util";
import * as crypto from "crypto";
import * as sinon from "sinon";

import { fs as fsMemory } from "memfs";

Expand All @@ -12,11 +14,12 @@ import * as should from "should";
import { AddressSpace, PseudoSession, SessionContext, UAFile } from "node-opcua-address-space";
import { generateAddressSpace } from "node-opcua-address-space/nodeJS";
import { UInt64, coerceUInt64, coerceNodeId } from "node-opcua-basic-types";
import { MethodIds } from "node-opcua-client";
import { CallMethodRequestOptions, MethodIds } from "node-opcua-client";
import { NodeId } from "node-opcua-nodeid";
import { nodesets } from "node-opcua-nodesets";
import { MockContinuationPointManager } from "node-opcua-address-space/testHelpers";

import { ClientFile, getFileData, OpenFileMode, installFileType, AbstractFs, readFile } from "..";
import { ClientFile, getFileData, OpenFileMode, installFileType, AbstractFs, readFile, IClientFilePriv } from "..";

// tslint:disable:no-var-requires
const describe = require("node-opcua-leak-detector").describeWithLeakDetector;
Expand Down Expand Up @@ -314,17 +317,84 @@ const describe = require("node-opcua-leak-detector").describeWithLeakDetector;
size2.should.eql(coerceUInt64(2));
});


it(m + "readFile", async () => {
const fileData = getFileData(opcuaFile2);
await promisify(fileSystem.writeFile)(fileData.filename, "1234567890", "utf-8");
await fileData.refresh();

const session = new PseudoSession(addressSpace);
const clientFile = new ClientFile(session, opcuaFile2.nodeId);
const clientFile = (new ClientFile(session, opcuaFile2.nodeId)) as unknown as IClientFilePriv;
await (clientFile as any).ensureInitialized();


const callSpy = sinon.spy(session, "call");
const buf = await readFile(clientFile);
callSpy.callCount.should.equal(3);

const openMethod = clientFile.openMethodNodeId!;
const readMethod = clientFile.readNodeId!
const closeMethod = clientFile.closeMethodNodeId!;
const getMethod = (n: number)=> callSpy.getCall(n).args[0] as CallMethodRequestOptions;

getMethod(0).methodId?.should.eql(openMethod);
getMethod(1).methodId?.should.eql(readMethod);
getMethod(2).methodId?.should.eql(closeMethod);

buf.toString("utf-8").should.eql("1234567890");
});

it(m + "readFile with large file", async () => {


const randomData = crypto.randomBytes(3 * 1024).toString("hex");

randomData.length.should.equal(6*1024);

const fileData = getFileData(opcuaFile2);

const oldMaxSize = fileData.maxChunkSizeBytes;

fileData.maxChunkSizeBytes = 1024;

await promisify(fileSystem.writeFile)(fileData.filename, randomData, "utf-8");
await fileData.refresh();


const session = new PseudoSession(addressSpace);
const clientFile = new ClientFile(session, opcuaFile2.nodeId) as unknown as IClientFilePriv;
await clientFile.ensureInitialized();


const callSpy = sinon.spy(session, "call");

const buf = await readFile(clientFile);


const openMethod = clientFile.openMethodNodeId!;
const readMethod = clientFile.readNodeId!
const closeMethod = clientFile.closeMethodNodeId!;
const getMethod = (n: number)=> callSpy.getCall(n).args[0] as CallMethodRequestOptions;

getMethod(0).methodId?.should.eql(openMethod);
getMethod(1).methodId?.should.eql(readMethod);
getMethod(2).methodId?.should.eql(readMethod);
getMethod(3).methodId?.should.eql(readMethod);
getMethod(4).methodId?.should.eql(readMethod);
getMethod(5).methodId?.should.eql(readMethod);
getMethod(6).methodId?.should.eql(readMethod);
getMethod(7).methodId?.should.eql(closeMethod);

callSpy.callCount.should.equal(8);

buf.toString("utf-8").should.eql(randomData);

fileData.maxChunkSizeBytes = oldMaxSize;

});



function swapHandle(c1: ClientFile, c2: ClientFile) {
const b = (c2 as any).fileHandle;
(c2 as any).fileHandle = c1.fileHandle;
Expand Down

0 comments on commit 3e1824f

Please sign in to comment.