diff --git a/package.json b/package.json index a73ddec..d16244b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@guilledk/arrowbatch-nodejs", - "version": "1.0.0-rc11", + "version": "1.0.0-rc12", "description": "Arrow Batch Storage protocol", "main": "./build/index.js", "type": "module", @@ -12,7 +12,8 @@ "scripts": { "bootstrap": "yarn", "build": "yarn run bootstrap && tsc", - "test-all": "c8 mocha build/tests/test*.spec.js" + "test-all": "mocha build/tests/test*.spec.js", + "coverage": "c8 mocha build/tests/test*.spec.js" }, "repository": { "type": "git", diff --git a/src/cache.ts b/src/cache.ts index b86c126..9c522f8 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -7,7 +7,7 @@ import moment from "moment"; export interface ArrowMetaCacheEntry { ts: number, - meta: ArrowBatchFileMetadata, startRow: any[] + meta: ArrowBatchFileMetadata } export interface ArrowCachedTables { @@ -55,12 +55,7 @@ export class ArrowBatchCache { this.metadataCache.delete(cacheKey); } - const firstTable = await ArrowBatchProtocol.readArrowBatchTable( - filePath, meta, 0); - - const startRow = firstTable.get(0).toArray(); - - const result = { ts: moment.now(), meta, startRow }; + const result = { ts: moment.now(), meta }; this.metadataCache.set(cacheKey, result); return [result, true]; } @@ -86,7 +81,7 @@ export class ArrowBatchCache { ]; } - async getTablesFor(ordinal: bigint) { + async getTablesFor(ordinal: bigint): Promise<[ArrowCachedTables, number]> { const adjustedOrdinal = this.ctx.getOrdinal(ordinal); // metadata about the bucket we are going to get tables for, mostly need to @@ -94,18 +89,25 @@ export class ArrowBatchCache { // boundary start const [bucketMetadata, metadataUpdated] = await this.getMetadataFor(adjustedOrdinal, 'root'); - // index relative to bucket boundary - const relativeIndex = ordinal - bucketMetadata.startRow[0]; + // ensure bucket contains ordinal + const bucketOrdStart = bucketMetadata.meta.batches[0].batch.startOrdinal; + const bucketOrdLast = bucketMetadata.meta.batches[ + bucketMetadata.meta.batches.length - 1].batch.lastOrdinal; - // get batch table index, assuming config.dumpSize table size is respected - const batchIndex = Number(relativeIndex / BigInt(this.ctx.config.dumpSize)); + if (ordinal < bucketOrdStart || ordinal > bucketOrdLast) + throw new Error(`Bucket does not contain ${ordinal}`); + + let batchIndex = 0; + while (ordinal > bucketMetadata.meta.batches[batchIndex].batch.lastOrdinal) { + batchIndex++; + } const cacheKey = `${adjustedOrdinal}-${batchIndex}`; if (this.tableCache.has(cacheKey)) { // we have this tables cached, but only return if metadata wasnt invalidated if (!metadataUpdated) - return this.tableCache.get(cacheKey); + return [this.tableCache.get(cacheKey), batchIndex]; // delete stale cache this.tableCache.delete(cacheKey) @@ -136,7 +138,7 @@ export class ArrowBatchCache { this.tableCache.delete(oldest); } - return tables; + return [tables, batchIndex]; } get size() : number { diff --git a/src/protocol.ts b/src/protocol.ts index 94a5f78..44a2e00 100644 --- a/src/protocol.ts +++ b/src/protocol.ts @@ -4,7 +4,7 @@ import {tableFromIPC} from "apache-arrow"; import {ZSTDDecompress} from 'simple-zstd'; import RLP from "rlp"; -import {bigintToUint8Array} from "./utils.js"; +import {bigintToUint8Array, numberToUint8Array} from "./utils.js"; export enum ArrowBatchCompression { UNCOMPRESSED = 0, @@ -13,12 +13,15 @@ export enum ArrowBatchCompression { export interface ArrowBatchGlobalHeader { versionConstant: string; + batchIndexStart: number; } export interface ArrowBatchHeader { headerConstant: string; batchByteSize: bigint; compression: ArrowBatchCompression; + startOrdinal: bigint; + lastOrdinal: bigint; } export interface ArrowBatchFileMetadata { @@ -72,27 +75,50 @@ export class ArrowBatchProtocol { * queries that only affect that small batch. */ static readonly ARROW_BATCH_VERSION_CONSTANT = 'ARROW-BATCH1'; - static readonly GLOBAL_HEADER_SIZE = ArrowBatchProtocol.ARROW_BATCH_VERSION_CONSTANT.length; + static readonly GLOBAL_HEADER_SIZE = ArrowBatchProtocol.ARROW_BATCH_VERSION_CONSTANT.length + 4; static readonly ARROW_BATCH_HEADER_CONSTANT = 'ARROW-BATCH-TABLE'; - static readonly BATCH_HEADER_SIZE = ArrowBatchProtocol.ARROW_BATCH_HEADER_CONSTANT.length + 8 + 1; + static readonly BATCH_HEADER_SIZE = ArrowBatchProtocol.ARROW_BATCH_HEADER_CONSTANT.length + 8 + 1 + 8 + 8; - static newGlobalHeader(): Uint8Array { - return new TextEncoder().encode( + static newGlobalHeader(batchIndexStart: number = 0): Uint8Array { + const strBytes = new TextEncoder().encode( ArrowBatchProtocol.ARROW_BATCH_VERSION_CONSTANT); + + const batchIndexBytes = numberToUint8Array(batchIndexStart); + + const buffer = new Uint8Array( + strBytes.length + 4 + ); + + buffer.set(strBytes, 0); + buffer.set(batchIndexBytes, strBytes.length); + + return buffer; } - static newBatchHeader(byteSize: bigint, compression: ArrowBatchCompression) { + static newBatchHeader( + byteSize: bigint, + compression: ArrowBatchCompression, + startOrdinal: bigint, + lastOrdinal: bigint + ) { const strBytes = new TextEncoder().encode( ArrowBatchProtocol.ARROW_BATCH_HEADER_CONSTANT); const batchSizeBytes = bigintToUint8Array(byteSize); const compressionByte = new Uint8Array([compression]); + const startOrdinalBytes = bigintToUint8Array(startOrdinal); + const lastOrdinalBytes = bigintToUint8Array(lastOrdinal); + + const buffer = new Uint8Array( + strBytes.length + batchSizeBytes.length + compressionByte.length + startOrdinalBytes.length + lastOrdinalBytes.length + ); - const buffer = new Uint8Array(strBytes.length + batchSizeBytes.length + 1); buffer.set(strBytes, 0); buffer.set(batchSizeBytes, strBytes.length); buffer.set(compressionByte, strBytes.length + batchSizeBytes.length); + buffer.set(startOrdinalBytes, strBytes.length + batchSizeBytes.length + compressionByte.length); + buffer.set(lastOrdinalBytes, strBytes.length + batchSizeBytes.length + compressionByte.length + startOrdinalBytes.length); return buffer; } @@ -102,7 +128,9 @@ export class ArrowBatchProtocol { const versionConstantBytes = buffer.subarray(0, versionConstantLength); const versionConstant = new TextDecoder("utf-8").decode(versionConstantBytes); - return { versionConstant }; + const batchIndexStart = buffer.readUInt32LE(versionConstantLength); + + return { versionConstant, batchIndexStart }; } static readBatchHeader(buffer: Buffer): ArrowBatchHeader { @@ -113,8 +141,10 @@ export class ArrowBatchProtocol { const sizeStart = headerConstantLength; const batchByteSize = buffer.readBigUInt64LE(sizeStart); const compression = buffer.readUint8(sizeStart + 8); + const startOrdinal = buffer.readBigInt64LE(sizeStart + 8 + 1); + const lastOrdinal = buffer.readBigInt64LE(sizeStart + 8 + 1 + 8); - return { headerConstant, batchByteSize, compression }; + return { headerConstant, batchByteSize, compression, startOrdinal, lastOrdinal }; } static async readFileMetadata(filePath: string): Promise { diff --git a/src/reader/index.ts b/src/reader/index.ts index 7670cdf..20692ae 100644 --- a/src/reader/index.ts +++ b/src/reader/index.ts @@ -8,7 +8,7 @@ import { RowWithRefs, TableBufferInfo } from "../context.js"; -import {ArrowBatchCache, ArrowCachedTables, isCachedTables} from "../cache.js"; +import {ArrowBatchCache, ArrowCachedTables, ArrowMetaCacheEntry, isCachedTables} from "../cache.js"; export class ArrowBatchReader extends ArrowBatchContext { @@ -168,7 +168,7 @@ export class ArrowBatchReader extends ArrowBatchContext { this._firstOrdinal = this._lastOrdinal; // maybe start flush - if (this.intermediateSize === Number(this.config.dumpSize)) + if ((ordinal + 1n) % this.config.dumpSize === 0n) this.beginFlush(); } @@ -189,6 +189,10 @@ export class ArrowBatchReader extends ArrowBatchContext { return this.getColumn('root', this.definition.root.ordinal).length; } + get intermediateFirstOrdinal(): bigint { + return this.getColumn('root', this.definition.root.ordinal)[0]; + } + get intermediateLastOrdinal(): bigint { return this.getColumn('root', this.definition.root.ordinal)[this.intermediateSize - 1]; } @@ -412,6 +416,24 @@ export class ArrowBatchReader extends ArrowBatchContext { return mappings.map( m => tableBuff.columns.get(m.name)[index]); } + + getRelativeTableIndex(ordinal: bigint, metadata: ArrowMetaCacheEntry): [number, bigint] { + // ensure bucket contains ordinal + const bucketOrdStart = metadata.meta.batches[0].batch.startOrdinal; + const bucketOrdLast = metadata.meta.batches[ + metadata.meta.batches.length - 1].batch.lastOrdinal; + + if (ordinal < bucketOrdStart || ordinal > bucketOrdLast) + throw new Error(`Bucket does not contain ${ordinal}`); + + let batchIndex = 0; + while (ordinal > metadata.meta.batches[batchIndex].batch.lastOrdinal) { + batchIndex++; + } + + return [batchIndex, ordinal - metadata.meta.batches[batchIndex].batch.startOrdinal]; + } + async getRow(ordinal: bigint): Promise { const ordinalField = this.definition.root.ordinal; @@ -440,7 +462,7 @@ export class ArrowBatchReader extends ArrowBatchContext { } // is row on disk? - const tables = await this.cache.getTablesFor(ordinal); + const [tables, batchIndex] = await this.cache.getTablesFor(ordinal); if (!(isCachedTables(tables))) throw new Error(`Tables for ordinal ${ordinal} not found`); @@ -448,12 +470,11 @@ export class ArrowBatchReader extends ArrowBatchContext { // fetch requested row from root table const adjustedOrdinal = this.getOrdinal(ordinal); const [bucketMetadata, _] = await this.cache.getMetadataFor(adjustedOrdinal, 'root'); - const relativeIndex = ordinal - bucketMetadata.startRow[0]; - const tableIndex = Number(relativeIndex % BigInt(this.config.dumpSize)); - const structRow = tables.root.get(tableIndex); + const [__, relativeIndex] = this.getRelativeTableIndex(ordinal, bucketMetadata); + const structRow = tables.root.get(Number(relativeIndex)); if (!structRow) - throw new Error(`Could not find row ${tableIndex}!`); + throw new Error(`Could not find row root-${adjustedOrdinal}-${batchIndex}-${relativeIndex}!`); const row = structRow.toArray(); this.tableMappings.get('root').map.forEach((m, i) => { diff --git a/src/tests/utils.ts b/src/tests/utils.ts index 8d9a544..57a6c84 100644 --- a/src/tests/utils.ts +++ b/src/tests/utils.ts @@ -162,8 +162,6 @@ export class TestChainGenerator { for (let i = from; i <= to; i++) { const block = blockRows[i]; writer.pushRow('block', block); - - writer.updateOrdinal(i); } }; diff --git a/src/utils.ts b/src/utils.ts index 622a583..1eb8e33 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -8,6 +8,7 @@ import {format, LogEntry, loggers, transports} from "winston"; import Transport from "winston-transport"; import {ZSTDCompress} from 'simple-zstd'; import EventEmitter from "node:events"; +import {number} from "zod"; // currentDir == build/ dir @@ -25,6 +26,15 @@ export function bigintToUint8Array (big: bigint): Uint8Array { } return byteArray; } + +export function numberToUint8Array(int: number): Uint8Array { + const byteArray = new Uint8Array(4); + for (let i = 0; i < byteArray.length; i++) { + byteArray[i] = Math.floor(int / Math.pow(256, i)) % 256; + } + return byteArray; +} + export async function compressUint8Array(input: Uint8Array, compressionLevel = 3) { // Convert Uint8Array to a Buffer since Node.js streams work with Buffers const inputBuffer = Buffer.from(input); diff --git a/src/writer/index.ts b/src/writer/index.ts index 4b6c48e..d087b28 100644 --- a/src/writer/index.ts +++ b/src/writer/index.ts @@ -7,14 +7,14 @@ import {format, Logger, loggers, transports} from "winston"; import {ArrowBatchReader} from "../reader/index.js"; import {ArrowBatchConfig} from "../types"; import {ArrowBatchContextDef, RowWithRefs} from "../context.js"; -import {isWorkerLogMessage, ROOT_DIR, waitEvent, WorkerLogMessage} from "../utils.js"; +import {extendedStringify, isWorkerLogMessage, ROOT_DIR, waitEvent, WorkerLogMessage} from "../utils.js"; import {WriterControlRequest, WriterControlResponse} from "./worker.js"; import {ArrowTableMapping, DEFAULT_STREAM_BUF_MEM} from "../protocol.js"; import ArrowBatchBroadcaster from "./broadcast.js"; import bytes from "bytes"; export const DEFAULT_BROADCAST_HOST = '127.0.0.1'; -export const DEFAULT_BROADCAST_PORT = 4200; +export const DEFAULT_BROADCAST_PORT = 4201; export class ArrowBatchWriter extends ArrowBatchReader { @@ -113,6 +113,8 @@ export class ArrowBatchWriter extends ArrowBatchReader { workerInfo.tid++; workerInfo.worker.postMessage(msg); + + // this.logger.debug(`sent ${extendedStringify(msg)} to worker ${name}`); } private writersMessageHandler(msg: WriterControlResponse | WorkerLogMessage) { @@ -234,6 +236,8 @@ export class ArrowBatchWriter extends ArrowBatchReader { fs.mkdirSync(this.wipBucketPath, {recursive: true}); const isUnfinished = this.intermediateSize < this.config.dumpSize; + const startOrdinal = this.intermediateFirstOrdinal; + const lastOrdinal = this.intermediateLastOrdinal; // push intermediate to auxiliary and clear it this._initIntermediate(); @@ -244,7 +248,8 @@ export class ArrowBatchWriter extends ArrowBatchReader { method: 'flush', params: { writeDir: this.wipBucketPath, - unfinished: isUnfinished + unfinished: isUnfinished, + startOrdinal, lastOrdinal } }) }); @@ -278,8 +283,10 @@ export class ArrowBatchWriter extends ArrowBatchReader { this.addRow(tableName, row.row); - if (tableName === 'root') + if (tableName === 'root') { this.broadcaster.broadcastRow(row); + this.updateOrdinal(row.row[0]); + } } private async trimOnBuffers(ordinal: bigint) { @@ -338,13 +345,11 @@ export class ArrowBatchWriter extends ArrowBatchReader { const [bucketMetadata, _] = await this.cache.getMetadataFor(adjustedOrdinal, 'root'); // trim idx relative to bucket start - const relativeIndex = ordinal - bucketMetadata.startRow[0]; + const [batchIndex, relativeIndex] = this.getRelativeTableIndex(ordinal, bucketMetadata); // table index might need to be loaded into buffers & be partially edited // everything after table index can be deleted - const tableIndex = Number(relativeIndex % BigInt(this.config.dumpSize)); - - if (tableIndex >= bucketMetadata.meta.batches.length) + if (batchIndex >= bucketMetadata.meta.batches.length) return; // truncate files from next table onwards @@ -352,14 +357,14 @@ export class ArrowBatchWriter extends ArrowBatchReader { [...this.tableMappings.keys()] .map(table => this.cache.getMetadataFor(adjustedOrdinal, table).then( ([meta, _]) => { - const tableIndexEnd = meta.meta.batches[tableIndex].end; + const tableIndexEnd = meta.meta.batches[batchIndex].end; const fileName = this.tableFileMap.get(adjustedOrdinal).get(table); return pfs.truncate(fileName, tableIndexEnd + 1); }))); // unwrap adjustedOrdinal:tableIndex table into fresh intermediate this._intermediateBuffers = this._createBuffers(); - const tables = await this.cache.getTablesFor(ordinal); + const [tables, __] = await this.cache.getTablesFor(ordinal); Object.entries( {root: tables.root, ...tables.others} diff --git a/src/writer/worker.ts b/src/writer/worker.ts index f03c211..ed547a9 100644 --- a/src/writer/worker.ts +++ b/src/writer/worker.ts @@ -64,7 +64,7 @@ const loggingOptions = { } const logger: Logger = loggers.add(`worker-internal-${tableName}`, loggingOptions); -const streamBuffer = Buffer.alloc(streamBufMem | DEFAULT_STREAM_BUF_MEM); +const streamBuffer = Buffer.alloc(streamBufMem); logger.info(`write stream buffer of ${streamBufMem.toLocaleString()} bytes allocated!`); const intermediateBuffers = {}; @@ -113,7 +113,9 @@ function flush(msg: WriterControlRequest) { /* * params: * - unfinished: boolean -> is this a .wip or not? - * - writeDir: string -> bucket dir path + * - writeDir: string -> bucket dir path, + * - startOrdinal: bigint -> root start ordinal of this batch + * - lastOrdinal: bigint -> root last ordinal of this batch */ const fileName = `${alias ?? tableName}.ab${msg.params.unfinished ? '.wip' : ''}`; const currentFile = path.join(msg.params.writeDir, fileName); @@ -144,7 +146,10 @@ function flush(msg: WriterControlRequest) { const newSize = fs.fstatSync(fd).size + ArrowBatchProtocol.BATCH_HEADER_SIZE + batchBytes.length; try { // Write the batch header - fs.appendFileSync(fd, ArrowBatchProtocol.newBatchHeader(BigInt(batchBytes.length), compression)); + fs.appendFileSync(fd, ArrowBatchProtocol.newBatchHeader( + BigInt(batchBytes.length), compression, + msg.params.startOrdinal, msg.params.lastOrdinal + )); // Write the batch content fs.appendFileSync(fd, batchBytes);