diff --git a/package.json b/package.json index 2e161e4..b8d4d58 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@guilledk/arrowbatch-nodejs", - "version": "1.0.0-rc1", + "version": "1.0.0-rc2", "description": "Arrow Batch Storage protocol", "main": "./build/index.js", "type": "module", diff --git a/src/cache.ts b/src/cache.ts index e8218af..273a2dd 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -5,11 +5,9 @@ import {ArrowBatchContext} from "./context.js"; import moment from "moment"; -export type CacheKey = string; // `${adjustedOrd}-${batchIdx}` - export interface ArrowMetaCacheEntry { ts: number, - meta: ArrowBatchFileMetadata, startOrdinal: bigint + meta: ArrowBatchFileMetadata, startRow: any[] } export interface ArrowCachedTables { @@ -30,10 +28,10 @@ export class ArrowBatchCache { private ctx: ArrowBatchContext; - private tableCache = new Map(); - private cacheOrder: CacheKey[] = []; + private tableCache = new Map(); + private cacheOrder: string[] = []; - private metadataCache = new Map(); + private metadataCache = new Map(); readonly dataDir: string; @@ -41,27 +39,29 @@ export class ArrowBatchCache { this.ctx = ctx; } - async getMetadataFor(adjustedOrdinal: number): Promise<[ArrowMetaCacheEntry, boolean]> { - const filePath = this.ctx.tableFileMap.get(adjustedOrdinal).get('root'); + async getMetadataFor(adjustedOrdinal: number, tableName: string): Promise<[ArrowMetaCacheEntry, boolean]> { + const filePath = this.ctx.tableFileMap.get(adjustedOrdinal).get(tableName); const meta = await ArrowBatchProtocol.readFileMetadata(filePath); - if (this.metadataCache.has(adjustedOrdinal)) { + const cacheKey = `${adjustedOrdinal}-${tableName}` + + if (this.metadataCache.has(cacheKey)) { // we might need to invalidate our metadata if file size changed - const cachedMeta = this.metadataCache.get(adjustedOrdinal); + const cachedMeta = this.metadataCache.get(cacheKey); if (cachedMeta.meta.size === meta.size) return [cachedMeta, false]; // size hasnt change, return cache // invalidate and re-calculate - this.metadataCache.delete(adjustedOrdinal); + this.metadataCache.delete(cacheKey); } const firstTable = await ArrowBatchProtocol.readArrowBatchTable( filePath, meta, 0); - const startOrdinal: bigint = firstTable.get(0).toArray()[0]; + const startRow = firstTable.get(0).toArray(); - const result = { ts: moment.now(), meta, startOrdinal }; - this.metadataCache.set(adjustedOrdinal, result); + const result = { ts: moment.now(), meta, startRow }; + this.metadataCache.set(cacheKey, result); return [result, true]; } @@ -89,15 +89,15 @@ export class ArrowBatchCache { // metadata about the bucket we are going to get tables for, mostly need to // figure out start ordinal for math to make sense in case non-aligned bucket // boundary start - const [bucketMetadata, metadataUpdated] = await this.getMetadataFor(adjustedOrdinal); + const [bucketMetadata, metadataUpdated] = await this.getMetadataFor(adjustedOrdinal, 'root'); // index relative to bucket boundary - const relativeIndex = ordinal - bucketMetadata.startOrdinal; + const relativeIndex = ordinal - bucketMetadata.startRow[0]; // get batch table index, assuming config.dumpSize table size is respected const batchIndex = Number(relativeIndex / BigInt(this.ctx.config.dumpSize)); - const cacheKey: CacheKey = `${adjustedOrdinal}-${batchIndex}`; + const cacheKey = `${adjustedOrdinal}-${batchIndex}`; if (this.tableCache.has(cacheKey)) { // we have this tables cached, but only return if metadata wasnt invalidated diff --git a/src/reader/index.ts b/src/reader/index.ts index ac6a442..bf2b962 100644 --- a/src/reader/index.ts +++ b/src/reader/index.ts @@ -21,7 +21,7 @@ export class ArrowBatchReader extends ArrowBatchContext { protected _auxiliaryBuffers: RowBuffers = new Map(); private isFirstUpdate: boolean = true; - private cache: ArrowBatchCache; + protected cache: ArrowBatchCache; constructor( config: ArrowBatchConfig, @@ -40,7 +40,7 @@ export class ArrowBatchReader extends ArrowBatchContext { this.cache = new ArrowBatchCache(this); - this._intermediateBuffers = this._initBuffer(); + this._intermediateBuffers = this._createBuffers(); this._initIntermediate(); } @@ -52,22 +52,23 @@ export class ArrowBatchReader extends ArrowBatchContext { return this._lastOrdinal; } - protected _initBuffer() { - const buffers = new Map(); - for (const [tableName, tableMapping] of this.tableMappings.entries()) { - buffers.set( - tableName, {columns: new Map()}); + protected _createBuffer(tableName: string) { + const buffers = {columns: new Map()}; + for (const mapping of this.tableMappings.get(tableName)) + buffers.columns.set(mapping.name, []); + return buffers; + } - const tableBuffers = buffers.get(tableName); - for (const mapping of tableMapping) - tableBuffers.columns.set(mapping.name, []); - } + protected _createBuffers() { + const buffers = new Map(); + for (const tableName of this.tableMappings.keys()) + buffers.set(tableName, this._createBuffer(tableName)); return buffers; } protected _initIntermediate() { this._auxiliaryBuffers = this._intermediateBuffers; - this._intermediateBuffers = this._initBuffer(); + this._intermediateBuffers = this._createBuffers(); this.logger.debug(`initialized buffers for ${[...this._intermediateBuffers.keys()]}`); } @@ -420,8 +421,8 @@ export class ArrowBatchReader extends ArrowBatchContext { // fetch requested row from root table const adjustedOrdinal = this.getOrdinal(ordinal); - const [bucketMetadata, _] = await this.cache.getMetadataFor(adjustedOrdinal); - const relativeIndex = ordinal - bucketMetadata.startOrdinal; + const [bucketMetadata, _] = await this.cache.getMetadataFor(adjustedOrdinal, 'root'); + const relativeIndex = ordinal - bucketMetadata.startRow[0]; const tableIndex = Number(relativeIndex % BigInt(this.config.dumpSize)); const structRow = tables.root.get(tableIndex); @@ -436,4 +437,46 @@ export class ArrowBatchReader extends ArrowBatchContext { return this.genRowWithRefsFromTables('root', row, tables); } + iter(params: {from: bigint, to: bigint}) : RowScroller { + return new RowScroller(this, params); + } +} + +export class RowScroller { + + private _isDone: boolean; + readonly from: bigint; // will push rows with ord >= `from` + readonly to: bigint; // will stop pushing rows when row with ord `to` is reached + + protected reader: ArrowBatchReader; + + private _lastYielded: bigint; + + constructor( + reader: ArrowBatchReader, + params: { + from: bigint, + to: bigint + } + ) { + this.reader = reader; + this.from = params.from; + this.to = params.to; + this._lastYielded = this.from - 1n; + } + + async nextResult(): Promise { + const nextBlock = this._lastYielded + 1n; + const row = await this.reader.getRow(nextBlock); + this._lastYielded = nextBlock; + this._isDone = this._lastYielded == this.to; + return row; + } + + async *[Symbol.asyncIterator](): AsyncIterableIterator { + do { + const row = await this.nextResult(); + yield row; + } while (!this._isDone) + } } \ No newline at end of file diff --git a/src/tests/testCache.spec.ts b/src/tests/testCache.spec.ts index 4f905d2..c7eefb3 100644 --- a/src/tests/testCache.spec.ts +++ b/src/tests/testCache.spec.ts @@ -7,9 +7,9 @@ import { ArrowBatchConfig, ArrowBatchReader, ArrowBatchWriter, - createLogger + createLogger, waitEvent } from '../index.js'; -import {randomHexString, TestChainGenerator, testDataContext, waitEvent} from "./utils.js"; +import {randomHexString, TestChainGenerator, testDataContext} from "./utils.js"; import {expect} from "chai"; describe('reader table cache', () => { @@ -99,7 +99,7 @@ describe('reader table cache', () => { expect(reader.cacheSize).to.be.equal(1); // @ts-ignore - const firstMetaTs = reader.cache.metadataCache.get(1).ts; + const firstMetaTs = reader.cache.metadataCache.get('1-root').ts; // add a byte at end of table file to trigger meta update fs.appendFileSync( @@ -112,7 +112,7 @@ describe('reader table cache', () => { expect(reader.cacheSize).to.be.equal(1); // @ts-ignore - const secondMetaTs = reader.cache.metadataCache.get(1).ts; + const secondMetaTs = reader.cache.metadataCache.get('1-root').ts; expect(secondMetaTs).to.be.greaterThan(firstMetaTs); }); diff --git a/src/tests/testRW.spec.ts b/src/tests/testRW.spec.ts index 47553dd..cf42160 100644 --- a/src/tests/testRW.spec.ts +++ b/src/tests/testRW.spec.ts @@ -7,9 +7,9 @@ import { ArrowBatchConfig, ArrowBatchReader, ArrowBatchWriter, - createLogger, RowWithRefs + createLogger, RowWithRefs, waitEvent } from '../index.js'; -import {randomHexString, TestChainGenerator, testDataContext, waitEvent} from "./utils.js"; +import {randomHexString, TestChainGenerator, testDataContext} from "./utils.js"; import {expect} from "chai"; describe('read/write', () => { @@ -47,10 +47,10 @@ describe('read/write', () => { const blocks = []; - for (let i = BigInt(from); i <= BigInt(to); i++) { - const row = await reader.getRow(i); + for await (const row of reader.iter({ + from: BigInt(from), to: BigInt(to) + })) blocks.push(row); - } return blocks; }; diff --git a/src/tests/utils.ts b/src/tests/utils.ts index a50e535..c282036 100644 --- a/src/tests/utils.ts +++ b/src/tests/utils.ts @@ -1,12 +1,9 @@ import crypto from 'node:crypto'; -import EventEmitter from 'node:events'; import moment from "moment"; import {ArrowBatchContextDef, RowWithRefs} from "../context.js"; import {ArrowBatchWriter} from "../writer"; -import {ArrowBatchConfig} from "../types"; -import {Logger} from "winston"; export function randomBytes(length: number): Buffer { return crypto.randomBytes(length); @@ -20,10 +17,6 @@ export function randomInteger(min: number, max: number): number { return Math.floor(Math.random() * (max - min + 1)) + min; } -export async function waitEvent(emitter: EventEmitter, event: string): Promise { - return new Promise(resolve => emitter.once(event, resolve)); -} - export const sleep = (ms: number) => new Promise(res => setTimeout(res, ms)); diff --git a/src/utils.ts b/src/utils.ts index 4722637..0dd516f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -7,6 +7,7 @@ import fs from 'node:fs'; import {format, LogEntry, loggers, transports} from "winston"; import Transport from "winston-transport"; import {ZSTDCompress} from 'simple-zstd'; +import EventEmitter from "node:events"; // currentDir == build/ dir @@ -139,4 +140,8 @@ export function createLogger(name: string, logLevel: string) { ] } return loggers.add(name, loggingOptions); +} + +export async function waitEvent(emitter: EventEmitter, event: string): Promise { + return new Promise(resolve => emitter.once(event, resolve)); } \ No newline at end of file diff --git a/src/writer/index.ts b/src/writer/index.ts index 6575b11..770123b 100644 --- a/src/writer/index.ts +++ b/src/writer/index.ts @@ -6,9 +6,10 @@ import {format, Logger, loggers, transports} from "winston"; import {ArrowBatchReader} from "../reader/index.js"; import {ArrowBatchConfig} from "../types"; -import {ArrowBatchContextDef} from "../context.js"; -import {isWorkerLogMessage, ROOT_DIR, WorkerLogMessage} from "../utils.js"; +import {ArrowBatchContextDef, RowBuffers} from "../context.js"; +import {isWorkerLogMessage, ROOT_DIR, waitEvent, WorkerLogMessage} from "../utils.js"; import {WriterControlRequest, WriterControlResponse} from "./worker.js"; +import {ArrowTableMapping} from "../protocol"; export class ArrowBatchWriter extends ArrowBatchReader { @@ -19,6 +20,7 @@ export class ArrowBatchWriter extends ArrowBatchReader { worker: Worker, status: 'running' | 'stopped', tid: number, + ackTid: number, tasks: Map, stack: Error}> }>(); private workerLoggers = new Map(); @@ -76,6 +78,7 @@ export class ArrowBatchWriter extends ArrowBatchReader { alias, worker, status: 'running', tid: 0, + ackTid: -1, tasks: new Map, stack: Error}>() }); }); @@ -121,6 +124,8 @@ export class ArrowBatchWriter extends ArrowBatchReader { throw new Error( `Received msg from writer worker but it has an unexpected status: ${workerInfo.status}`); + workerInfo.ackTid = msg.tid; + if (msg.method === 'flush') { const auxBuffs = this._auxiliaryBuffers.get(msg.name); @@ -144,11 +149,17 @@ export class ArrowBatchWriter extends ArrowBatchReader { this.wipFilesMap.clear(); } - this.events.emit('flush'); + this.reloadOnDiskBuckets().then(() => this.events.emit('flush')); } } workerInfo.tasks.delete(msg.tid); + + const allWorkersReady = [...this.writeWorkers.values()] + .every(w => w.ackTid == w.tid - 1); + + if (allWorkersReady) + this.events.emit('workers-ready') } async init(startOrdinal: number | bigint) { @@ -238,4 +249,123 @@ export class ArrowBatchWriter extends ArrowBatchReader { params: row }, ref); } + + private async trimOnBuffers(ordinal: bigint) { + const recursiveBufferTrim = ( + table: string, ref: any, refField: ArrowTableMapping + ) => { + const references = this.refMappings.get(table); + + const refColumn = this._intermediateBuffers + .get(table).columns.get(refField.name); + + let trimIdx = 0; + for (const val of refColumn) { + if (val === ref) + break; + trimIdx++; + } + + if (trimIdx == refColumn.length) { + this._intermediateBuffers.set(table, this._createBuffer(table)); + return; + } + + const row = this.getBufferRow(table, trimIdx); + + for (const [childName, childRefInfo] of Object.entries(references)) + recursiveBufferTrim(childName, row[childRefInfo.parentIndex], childRefInfo.childMapping); + + this.sendMessageToWriter(table, { + method: 'trim', + params: { idx: trimIdx } + }); + + this.tableMappings.get(table).forEach(m => + this._intermediateBuffers.get(table).columns.get(m.name).splice(trimIdx) + ); + }; + + recursiveBufferTrim('root', ordinal, this.tableMappings.get('root')[0]); + await waitEvent(this.events, 'workers-ready'); + } + private async trimOnDisk(ordinal: bigint) { + const adjustedOrdinal = this.getOrdinal(ordinal); + + // delete every bucket bigger than adjustedOrdinal + const bucketDeleteList = [...this.tableFileMap.keys()] + .sort() + .reverse() + .filter(bucket => bucket > adjustedOrdinal); + + await Promise.all(bucketDeleteList.map(bucket => pfs.rm( + path.dirname(this.tableFileMap.get(bucket).get('root')), + {recursive: true} + ))); + + const [bucketMetadata, _] = await this.cache.getMetadataFor(adjustedOrdinal, 'root'); + + // trim idx relative to bucket start + const relativeIndex = ordinal - bucketMetadata.startRow[0]; + + // table index might need to be loaded into buffers & be partially edited + // everything after table index can be deleted + const tableIndex = Number(relativeIndex % BigInt(this.config.dumpSize)); + + if (tableIndex >= bucketMetadata.meta.batches.length) + return; + + // truncate files from next table onwards + await Promise.all( + [...this.tableMappings.keys()] + .map(table => this.cache.getMetadataFor(adjustedOrdinal, table).then( + ([meta, _]) => { + const tableIndexEnd = meta.meta.batches[tableIndex].end; + const fileName = this.tableFileMap.get(adjustedOrdinal).get(table); + return pfs.truncate(fileName, tableIndexEnd + 1); + }))); + + // unwrap adjustedOrdinal:tableIndex table into fresh intermediate + this._intermediateBuffers = this._createBuffers(); + const tables = await this.cache.getTablesFor(ordinal); + + Object.entries( + {root: tables.root, ...tables.others} + ).forEach(([tableName, table]) => { + for (let i = 0; i < table.numRows; i++) + this._pushRawRow(tableName, table.get(i).toArray()); + }); + + // use trim buffers helper + await this.trimOnBuffers(ordinal); + } + + async trimFrom(ordinal: bigint) { + // make sure all workers are idle + await waitEvent(this.events, 'workers-ready'); + + // if trimming for further back than known first ord, reset all state vars + if (ordinal <= this.firstOrdinal) { + this._firstOrdinal = null; + this._lastOrdinal = null; + } else + this._lastOrdinal = ordinal - 1n; // if trim is within written range, set state to ord - 1 + + const rootInterBuffs = this._intermediateBuffers.get('root'); + const ordinalField = this.definition.root.ordinal; + + // if only have to trim buffers + if (rootInterBuffs.columns.get(ordinalField).length > 0) { + const oldestOnIntermediate = rootInterBuffs.columns.get(ordinalField)[0]; + const isOnIntermediate = ordinal >= oldestOnIntermediate + if (isOnIntermediate) { + await this.trimOnBuffers(ordinal); + return; + } + } + + // if need to hit disk tables + await this.trimOnDisk(ordinal); + await this.reloadOnDiskBuckets(); + } } \ No newline at end of file diff --git a/src/writer/worker.ts b/src/writer/worker.ts index 5113e2c..6f6150b 100644 --- a/src/writer/worker.ts +++ b/src/writer/worker.ts @@ -15,7 +15,7 @@ const streamBuffer = Buffer.alloc(DEFAULT_STREAM_BUF_MEM); export interface WriterControlRequest { tid: number - method: 'addRow' | 'flush' + method: 'addRow' | 'flush' | 'trim' params?: any } @@ -102,6 +102,11 @@ async function serializeTable(table: Table): Promise { } function flush(msg: WriterControlRequest) { + /* + * params: + * - unfinished: boolean -> is this a .wip or not? + * - writeDir: string -> bucket dir path + */ if (intermediateSize == 0) return; @@ -175,6 +180,9 @@ function flush(msg: WriterControlRequest) { } function addRow(msg: WriterControlRequest) { + /* + * params: row to write + */ const typedRow = tableMappings.map( (fieldInfo, index) => { try { @@ -201,8 +209,26 @@ function addRow(msg: WriterControlRequest) { }); } +function trim(msg: WriterControlRequest) { + /* + * params: + * - trimIdx: number -> index to delete from <= + */ + for (const mapping of tableMappings) + intermediateBuffers[mapping.name].splice(msg.params.trimIdx); + + intermediateSize = intermediateBuffers[tableMappings[0].name].length; + + parentPort.postMessage({ + tid: msg.tid, + name: tableName, + method: msg.method, + status: 'ok' + }); +} + const handlers = { - addRow, flush + addRow, flush, trim }; _initBuffer();