From 284d6424f3df478c5fa32fe34bf206f2e1d9a1da Mon Sep 17 00:00:00 2001 From: saitunc Date: Thu, 22 Jan 2026 12:29:59 +0300 Subject: [PATCH 1/7] feat: add a decorator method for checking progress of methods --- packages/sequencer/src/helpers/BusyGuard.ts | 37 +++++++++++++++++++++ packages/sequencer/src/index.ts | 1 + 2 files changed, 38 insertions(+) create mode 100644 packages/sequencer/src/helpers/BusyGuard.ts diff --git a/packages/sequencer/src/helpers/BusyGuard.ts b/packages/sequencer/src/helpers/BusyGuard.ts new file mode 100644 index 000000000..0f117b697 --- /dev/null +++ b/packages/sequencer/src/helpers/BusyGuard.ts @@ -0,0 +1,37 @@ +import { log } from "@proto-kit/common"; +/** + * Decorator that ensures a function/method is not currently in use. + * Mostly useful for production of blocks, batches and tasks. + */ +export function ensureNotBusy() { + return function InnerFunction( + target: object, + methodName: string, + descriptor: TypedPropertyDescriptor<(...args: any[]) => Promise> + ) { + const originalMethod = descriptor.value!; + + descriptor.value = async function decorator( + this: { inProgress: boolean }, + ...args: any[] + ) { + if (this.inProgress === true) { + log.info(`${methodName.toString()} is in use at the moment.`); + return undefined; + } + + this.inProgress = true; + try { + return await originalMethod.apply(this, args); + } catch (error: unknown) { + if (error instanceof Error) { + throw error; + } else { + log.error(error); + } + } finally { + this.inProgress = false; + } + }; + }; +} diff --git a/packages/sequencer/src/index.ts b/packages/sequencer/src/index.ts index a0fee0179..813bc66d5 100644 --- a/packages/sequencer/src/index.ts +++ b/packages/sequencer/src/index.ts @@ -1,4 +1,5 @@ export * from "./helpers/utils"; +export * from "./helpers/BusyGuard"; export * from "./mempool/Mempool"; export * from "./mempool/PendingTransaction"; export * from "./mempool/CompressedSignature"; From 3c995ccb8c2d3c8b848fe389b5c7dad2b051d2c7 Mon Sep 17 00:00:00 2001 From: saitunc Date: Thu, 22 Jan 2026 12:30:59 +0300 Subject: [PATCH 2/7] refactor: replace use of inProgress flag with decorator method --- .../production/BatchProducerModule.ts | 37 +------- .../sequencing/BlockProducerModule.ts | 49 ++++------- .../production/trigger/TimedBlockTrigger.ts | 22 ++--- .../src/worker/queue/LocalTaskQueue.ts | 85 +++++++++---------- 4 files changed, 70 insertions(+), 123 deletions(-) diff --git a/packages/sequencer/src/protocol/production/BatchProducerModule.ts b/packages/sequencer/src/protocol/production/BatchProducerModule.ts index 545190624..87eb550b6 100644 --- a/packages/sequencer/src/protocol/production/BatchProducerModule.ts +++ b/packages/sequencer/src/protocol/production/BatchProducerModule.ts @@ -17,6 +17,7 @@ import { BlockWithResult } from "../../storage/model/Block"; import type { Database } from "../../storage/Database"; import { AsyncLinkedLeafStore } from "../../state/async/AsyncLinkedLeafStore"; import { CachedLinkedLeafStore } from "../../state/lmt/CachedLinkedLeafStore"; +import { ensureNotBusy } from "../../helpers/BusyGuard"; import { BlockProofSerializer } from "./tasks/serializers/BlockProofSerializer"; import { BatchTracingService } from "./tracing/BatchTracingService"; @@ -44,8 +45,6 @@ const errors = { */ @sequencerModule() export class BatchProducerModule extends SequencerModule { - private productionInProgress = false; - public constructor( @inject("AsyncLinkedLeafStore") private readonly merkleStore: AsyncLinkedLeafStore, @@ -64,41 +63,11 @@ export class BatchProducerModule extends SequencerModule { * transactions that are present in the mempool. This function should also * be the one called by BlockTriggerss */ + @ensureNotBusy() public async createBatch( blocks: BlockWithResult[] ): Promise { - if (!this.productionInProgress) { - try { - this.productionInProgress = true; - - const batch = await this.tryProduceBatch(blocks); - - this.productionInProgress = false; - - return batch; - } catch (error: unknown) { - this.productionInProgress = false; - // TODO Check if that still makes sense - if (error instanceof Error) { - if ( - !error.message.includes( - "Can't create a block with zero transactions" - ) - ) { - log.error(error); - } - - throw error; - } else { - log.error(error); - } - } - } else { - log.debug( - "Skipping new block production because production is still in progress" - ); - } - return undefined; + return await this.tryProduceBatch(blocks); } private async tryProduceBatch( diff --git a/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts b/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts index 8f035c338..2e1f8858b 100644 --- a/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts +++ b/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts @@ -27,6 +27,7 @@ import { IncomingMessagesService } from "../../../settlement/messages/IncomingMe import { Tracer } from "../../../logging/Tracer"; import { trace } from "../../../logging/trace"; import { AsyncLinkedLeafStore } from "../../../state/async/AsyncLinkedLeafStore"; +import { ensureNotBusy } from "../../../helpers/BusyGuard"; import { BlockProductionService } from "./BlockProductionService"; import { BlockResultService } from "./BlockResultService"; @@ -38,8 +39,6 @@ export interface BlockConfig { @sequencerModule() export class BlockProducerModule extends SequencerModule { - private productionInProgress = false; - public constructor( @inject("Mempool") private readonly mempool: Mempool, @inject("IncomingMessagesService", { isOptional: true }) @@ -140,37 +139,25 @@ export class BlockProducerModule extends SequencerModule { return result; } + @ensureNotBusy() public async tryProduceBlock(): Promise { - if (!this.productionInProgress) { - try { - const block = await this.produceBlock(); - - if (block === undefined) { - if (!this.allowEmptyBlock()) { - log.info("No transactions in mempool, skipping production"); - } else { - log.error("Something wrong happened, skipping block"); - } - return undefined; - } + const block = await this.produceBlock(); - log.info( - `Produced block #${block.height.toBigInt()} (${block.transactions.length} txs)` - ); - this.prettyPrintBlockContents(block); - - return block; - } catch (error: unknown) { - if (error instanceof Error) { - throw error; - } else { - log.error(error); - } - } finally { - this.productionInProgress = false; + if (block === undefined) { + if (!this.allowEmptyBlock()) { + log.info("No transactions in mempool, skipping production"); + } else { + log.error("Something wrong happened, skipping block"); } + return undefined; } - return undefined; + + log.info( + `Produced block #${block.height.toBigInt()} (${block.transactions.length} txs)` + ); + this.prettyPrintBlockContents(block); + + return block; } // TODO Move to different service, to remove dependency on mempool and messagequeue @@ -220,8 +207,6 @@ export class BlockProducerModule extends SequencerModule { @trace("block") private async produceBlock(): Promise { - this.productionInProgress = true; - const { txs, metadata } = await this.collectProductionData(); // Skip production if no transactions are available for now @@ -263,8 +248,6 @@ export class BlockProducerModule extends SequencerModule { ); } - this.productionInProgress = false; - return blockResult?.block; } diff --git a/packages/sequencer/src/protocol/production/trigger/TimedBlockTrigger.ts b/packages/sequencer/src/protocol/production/trigger/TimedBlockTrigger.ts index 73b630be1..9818e3277 100644 --- a/packages/sequencer/src/protocol/production/trigger/TimedBlockTrigger.ts +++ b/packages/sequencer/src/protocol/production/trigger/TimedBlockTrigger.ts @@ -12,6 +12,7 @@ import { BridgingModule, SettlementTokenConfig, } from "../../../settlement/BridgingModule"; +import { ensureNotBusy } from "../../../helpers/BusyGuard"; import { BlockEvents, BlockTriggerBase } from "./BlockTrigger"; @@ -45,9 +46,6 @@ export class TimedBlockTrigger private interval?: any; - // TODO Move that logic to somewhere proper - private settlementInProgress = false; - public constructor( @inject("BatchProducerModule", { isOptional: true }) batchProducerModule: BatchProducerModule | undefined, @@ -123,15 +121,9 @@ export class TimedBlockTrigger // otherwise treat as unproven-only if ( settlementInterval !== undefined && - totalTime % settlementInterval === 0 && - !this.settlementInProgress + totalTime % settlementInterval === 0 ) { - this.settlementInProgress = true; - const batch = await this.produceBatch(); - if (batch !== undefined) { - await this.settle(batch, this.config.settlementTokenConfig); - } - this.settlementInProgress = false; + await this.tryProduceSettlement(); } } catch (error) { log.error(error); @@ -151,6 +143,14 @@ export class TimedBlockTrigger } } + @ensureNotBusy() + private async tryProduceSettlement(): Promise { + const batch = await this.produceBatch(); + if (batch !== undefined) { + await this.settle(batch, this.config.settlementTokenConfig); + } + } + public async close(): Promise { // eslint-disable-next-line @typescript-eslint/no-unsafe-argument clearInterval(this.interval); diff --git a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts index e325508ca..70daf9521 100644 --- a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts +++ b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts @@ -3,6 +3,7 @@ import { log, mapSequential, noop, sleep } from "@proto-kit/common"; import { sequencerModule } from "../../sequencer/builder/SequencerModule"; import { TaskPayload } from "../flow/Task"; import { Closeable } from "../../sequencer/builder/Closeable"; +import { ensureNotBusy } from "../../helpers/BusyGuard"; import { InstantiatedQueue, TaskQueue } from "./TaskQueue"; import { ListenerList } from "./ListenerList"; @@ -91,56 +92,50 @@ export class LocalTaskQueue [key: string]: QueueListener[] | undefined; } = {}; - private taskInProgress = false; - + @ensureNotBusy() public async workNextTasks() { - if (this.taskInProgress) { - return; - } - this.taskInProgress = true; - - // Collect all tasks - const tasksToExecute = Object.entries(this.queuedTasks).flatMap( - ([queueName, tasks]) => { - if (tasks.length > 0 && this.workers[queueName]) { - const functions = tasks.map((task) => async () => { - // Execute task in worker - - log.trace(`Working ${task.payload.name} with id ${task.taskId}`); - - const payload = await this.workers[queueName]?.handler( - task.payload - ); - - if (payload === "closed" || payload === undefined) { - return; - } - log.trace("LocalTaskQueue got", JSON.stringify(payload)); - - // Notify listeners about result - const listenerPromises = this.listeners[queueName]?.map( - async (listener) => { - await listener(payload); - } - ); - await Promise.all(listenerPromises || []); - }); - this.queuedTasks[queueName] = []; - return functions; - } + let hasMoreTasks = true; - return []; - } - ); + while (hasMoreTasks) { + // Collect all tasks + const tasksToExecute = Object.entries(this.queuedTasks).flatMap( + ([queueName, tasks]) => { + if (tasks.length > 0 && this.workers[queueName]) { + const functions = tasks.map((task) => async () => { + // Execute task in worker - // Execute all tasks - await mapSequential(tasksToExecute, async (task) => await task()); + log.trace(`Working ${task.payload.name} with id ${task.taskId}`); + + const payload = await this.workers[queueName]?.handler( + task.payload + ); + + if (payload === "closed" || payload === undefined) { + return; + } + log.trace("LocalTaskQueue got", JSON.stringify(payload)); + + // Notify listeners about result + const listenerPromises = this.listeners[queueName]?.map( + async (listener) => { + await listener(payload); + } + ); + await Promise.all(listenerPromises || []); + }); + this.queuedTasks[queueName] = []; + return functions; + } + + return []; + } + ); - this.taskInProgress = false; + // Execute all tasks + await mapSequential(tasksToExecute, async (task) => await task()); - // In case new tasks came up in the meantime, execute them as well - if (tasksToExecute.length > 0) { - await this.workNextTasks(); + // Continue loop only if we processed tasks (more may have arrived) + hasMoreTasks = tasksToExecute.length > 0; } } From ab05c0c203f3f5e9785f84f6d740b9ab0c281847 Mon Sep 17 00:00:00 2001 From: saitunc Date: Thu, 22 Jan 2026 16:00:55 +0300 Subject: [PATCH 3/7] style: fix lint errors --- packages/sequencer/src/helpers/BusyGuard.ts | 3 ++- packages/sequencer/src/worker/queue/LocalTaskQueue.ts | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/sequencer/src/helpers/BusyGuard.ts b/packages/sequencer/src/helpers/BusyGuard.ts index 0f117b697..530a1bbc4 100644 --- a/packages/sequencer/src/helpers/BusyGuard.ts +++ b/packages/sequencer/src/helpers/BusyGuard.ts @@ -11,7 +11,8 @@ export function ensureNotBusy() { ) { const originalMethod = descriptor.value!; - descriptor.value = async function decorator( + // eslint-disable-next-line consistent-return + descriptor.value = async function value( this: { inProgress: boolean }, ...args: any[] ) { diff --git a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts index 70daf9521..7b1e5eb9f 100644 --- a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts +++ b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts @@ -97,7 +97,6 @@ export class LocalTaskQueue let hasMoreTasks = true; while (hasMoreTasks) { - // Collect all tasks const tasksToExecute = Object.entries(this.queuedTasks).flatMap( ([queueName, tasks]) => { if (tasks.length > 0 && this.workers[queueName]) { @@ -118,6 +117,7 @@ export class LocalTaskQueue // Notify listeners about result const listenerPromises = this.listeners[queueName]?.map( async (listener) => { + // eslint-disable-next-line no-await-in-loop await listener(payload); } ); From ea7809a55034d2834e7cc008d28e2e8e9ebf1176 Mon Sep 17 00:00:00 2001 From: saitunc Date: Thu, 22 Jan 2026 23:29:20 +0300 Subject: [PATCH 4/7] refactor: revert applying ensureNotBusy decorator on LocalTaskQueue --- .../src/worker/queue/LocalTaskQueue.ts | 84 ++++++++++--------- 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts index 7b1e5eb9f..eaad97c58 100644 --- a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts +++ b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts @@ -92,50 +92,56 @@ export class LocalTaskQueue [key: string]: QueueListener[] | undefined; } = {}; - @ensureNotBusy() - public async workNextTasks() { - let hasMoreTasks = true; + private taskInProgress = false; - while (hasMoreTasks) { - const tasksToExecute = Object.entries(this.queuedTasks).flatMap( - ([queueName, tasks]) => { - if (tasks.length > 0 && this.workers[queueName]) { - const functions = tasks.map((task) => async () => { - // Execute task in worker - - log.trace(`Working ${task.payload.name} with id ${task.taskId}`); - - const payload = await this.workers[queueName]?.handler( - task.payload - ); - - if (payload === "closed" || payload === undefined) { - return; +public async workNextTasks() { + if (this.taskInProgress) { + return; + } + this.taskInProgress = true; + + // Collect all tasks + const tasksToExecute = Object.entries(this.queuedTasks).flatMap( + ([queueName, tasks]) => { + if (tasks.length > 0 && this.workers[queueName]) { + const functions = tasks.map((task) => async () => { + // Execute task in worker + + log.trace(`Working ${task.payload.name} with id ${task.taskId}`); + + const payload = await this.workers[queueName]?.handler( + task.payload + ); + + if (payload === "closed" || payload === undefined) { + return; + } + log.trace("LocalTaskQueue got", JSON.stringify(payload)); + + // Notify listeners about result + const listenerPromises = this.listeners[queueName]?.map( + async (listener) => { + await listener(payload); } - log.trace("LocalTaskQueue got", JSON.stringify(payload)); - - // Notify listeners about result - const listenerPromises = this.listeners[queueName]?.map( - async (listener) => { - // eslint-disable-next-line no-await-in-loop - await listener(payload); - } - ); - await Promise.all(listenerPromises || []); - }); - this.queuedTasks[queueName] = []; - return functions; - } - - return []; + ); + await Promise.all(listenerPromises || []); + }); + this.queuedTasks[queueName] = []; + return functions; } - ); - // Execute all tasks - await mapSequential(tasksToExecute, async (task) => await task()); + return []; + } + ); + + // Execute all tasks + await mapSequential(tasksToExecute, async (task) => await task()); + + this.taskInProgress = false; - // Continue loop only if we processed tasks (more may have arrived) - hasMoreTasks = tasksToExecute.length > 0; + // In case new tasks came up in the meantime, execute them as well + if (tasksToExecute.length > 0) { + await this.workNextTasks(); } } From 602a492bf71720b61d995f12302a30be7c97dabc Mon Sep 17 00:00:00 2001 From: saitunc Date: Thu, 22 Jan 2026 23:29:59 +0300 Subject: [PATCH 5/7] refactor: update use of ensureNotBusy decorator --- packages/sequencer/src/helpers/BusyGuard.ts | 34 ++++++++----------- .../production/BatchProducerModule.ts | 4 +-- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/packages/sequencer/src/helpers/BusyGuard.ts b/packages/sequencer/src/helpers/BusyGuard.ts index 530a1bbc4..00c043b6b 100644 --- a/packages/sequencer/src/helpers/BusyGuard.ts +++ b/packages/sequencer/src/helpers/BusyGuard.ts @@ -3,36 +3,32 @@ import { log } from "@proto-kit/common"; * Decorator that ensures a function/method is not currently in use. * Mostly useful for production of blocks, batches and tasks. */ -export function ensureNotBusy() { - return function InnerFunction( - target: object, +export function ensureNotBusy() { + + let inProgress = false; + + return function innerFunction( + _target: T, methodName: string, descriptor: TypedPropertyDescriptor<(...args: any[]) => Promise> - ) { + ): void { const originalMethod = descriptor.value!; - // eslint-disable-next-line consistent-return - descriptor.value = async function value( - this: { inProgress: boolean }, - ...args: any[] + descriptor.value = async function wrapped( + this: T, + ...args: unknown[] ) { - if (this.inProgress === true) { - log.info(`${methodName.toString()} is in use at the moment.`); + if (inProgress) { + log.trace(`${methodName} is in use at the moment.`); return undefined; } - this.inProgress = true; + inProgress = true; try { return await originalMethod.apply(this, args); - } catch (error: unknown) { - if (error instanceof Error) { - throw error; - } else { - log.error(error); - } } finally { - this.inProgress = false; + inProgress = false; } }; }; -} +} \ No newline at end of file diff --git a/packages/sequencer/src/protocol/production/BatchProducerModule.ts b/packages/sequencer/src/protocol/production/BatchProducerModule.ts index 87eb550b6..a27409f88 100644 --- a/packages/sequencer/src/protocol/production/BatchProducerModule.ts +++ b/packages/sequencer/src/protocol/production/BatchProducerModule.ts @@ -63,13 +63,13 @@ export class BatchProducerModule extends SequencerModule { * transactions that are present in the mempool. This function should also * be the one called by BlockTriggerss */ - @ensureNotBusy() public async createBatch( blocks: BlockWithResult[] ): Promise { return await this.tryProduceBatch(blocks); } - + + @ensureNotBusy() private async tryProduceBatch( blocks: BlockWithResult[] ): Promise { From 7205261be2a79c5f6a49366296b9431fac653c46 Mon Sep 17 00:00:00 2001 From: saitunc Date: Thu, 22 Jan 2026 23:31:24 +0300 Subject: [PATCH 6/7] style: run lint fix --- packages/sequencer/src/helpers/BusyGuard.ts | 8 ++------ .../src/protocol/production/BatchProducerModule.ts | 2 +- packages/sequencer/src/worker/queue/LocalTaskQueue.ts | 3 +-- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/packages/sequencer/src/helpers/BusyGuard.ts b/packages/sequencer/src/helpers/BusyGuard.ts index 00c043b6b..f38dc77a1 100644 --- a/packages/sequencer/src/helpers/BusyGuard.ts +++ b/packages/sequencer/src/helpers/BusyGuard.ts @@ -4,7 +4,6 @@ import { log } from "@proto-kit/common"; * Mostly useful for production of blocks, batches and tasks. */ export function ensureNotBusy() { - let inProgress = false; return function innerFunction( @@ -14,10 +13,7 @@ export function ensureNotBusy() { ): void { const originalMethod = descriptor.value!; - descriptor.value = async function wrapped( - this: T, - ...args: unknown[] - ) { + descriptor.value = async function wrapped(this: T, ...args: unknown[]) { if (inProgress) { log.trace(`${methodName} is in use at the moment.`); return undefined; @@ -31,4 +27,4 @@ export function ensureNotBusy() { } }; }; -} \ No newline at end of file +} diff --git a/packages/sequencer/src/protocol/production/BatchProducerModule.ts b/packages/sequencer/src/protocol/production/BatchProducerModule.ts index a27409f88..ac38af420 100644 --- a/packages/sequencer/src/protocol/production/BatchProducerModule.ts +++ b/packages/sequencer/src/protocol/production/BatchProducerModule.ts @@ -68,7 +68,7 @@ export class BatchProducerModule extends SequencerModule { ): Promise { return await this.tryProduceBatch(blocks); } - + @ensureNotBusy() private async tryProduceBatch( blocks: BlockWithResult[] diff --git a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts index eaad97c58..e325508ca 100644 --- a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts +++ b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts @@ -3,7 +3,6 @@ import { log, mapSequential, noop, sleep } from "@proto-kit/common"; import { sequencerModule } from "../../sequencer/builder/SequencerModule"; import { TaskPayload } from "../flow/Task"; import { Closeable } from "../../sequencer/builder/Closeable"; -import { ensureNotBusy } from "../../helpers/BusyGuard"; import { InstantiatedQueue, TaskQueue } from "./TaskQueue"; import { ListenerList } from "./ListenerList"; @@ -94,7 +93,7 @@ export class LocalTaskQueue private taskInProgress = false; -public async workNextTasks() { + public async workNextTasks() { if (this.taskInProgress) { return; } From 4cd45a6f968d70021c9f7b5a336e98435a4f0d05 Mon Sep 17 00:00:00 2001 From: saitunc Date: Fri, 23 Jan 2026 00:25:08 +0300 Subject: [PATCH 7/7] refactor: replace unecessary wrapper function --- .../src/protocol/production/BatchProducerModule.ts | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/packages/sequencer/src/protocol/production/BatchProducerModule.ts b/packages/sequencer/src/protocol/production/BatchProducerModule.ts index ac38af420..81ee673be 100644 --- a/packages/sequencer/src/protocol/production/BatchProducerModule.ts +++ b/packages/sequencer/src/protocol/production/BatchProducerModule.ts @@ -61,16 +61,10 @@ export class BatchProducerModule extends SequencerModule { /** * Main function to call when wanting to create a new block based on the * transactions that are present in the mempool. This function should also - * be the one called by BlockTriggerss + * be the one called by BlockTriggers. */ - public async createBatch( - blocks: BlockWithResult[] - ): Promise { - return await this.tryProduceBatch(blocks); - } - @ensureNotBusy() - private async tryProduceBatch( + public async createBatch( blocks: BlockWithResult[] ): Promise { log.info("Producing batch...");