Skip to content

Commit 0b7ea8a

Browse files
ericallamsamejr
authored andcommitted
fix(run-engine): pass through engine fair dequeue selection strategy options instead of using defaults (#2565)
1 parent 94d96b8 commit 0b7ea8a

File tree

2 files changed

+27
-9
lines changed

2 files changed

+27
-9
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,22 @@ export class RunEngine {
129129

130130
const keys = new RunQueueFullKeyProducer();
131131

132+
const queueSelectionStrategyOptions = {
133+
keys,
134+
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
135+
defaultEnvConcurrencyLimit: options.queue?.defaultEnvConcurrency ?? 10,
136+
...options.queue?.queueSelectionStrategyOptions,
137+
};
138+
139+
this.logger.log("RunEngine FairQueueSelectionStrategy queueSelectionStrategyOptions", {
140+
options: queueSelectionStrategyOptions,
141+
});
142+
132143
this.runQueue = new RunQueue({
133144
name: "rq",
134145
tracer: trace.getTracer("rq"),
135146
keys,
136-
queueSelectionStrategy: new FairQueueSelectionStrategy({
137-
keys,
138-
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
139-
defaultEnvConcurrencyLimit: options.queue?.defaultEnvConcurrency ?? 10,
140-
}),
147+
queueSelectionStrategy: new FairQueueSelectionStrategy(queueSelectionStrategyOptions),
141148
defaultEnvConcurrency: options.queue?.defaultEnvConcurrency ?? 10,
142149
defaultEnvConcurrencyBurstFactor: options.queue?.defaultEnvConcurrencyBurstFactor,
143150
logger: new Logger("RunQueue", options.queue?.logLevel ?? "info"),
@@ -1730,10 +1737,13 @@ export class RunEngine {
17301737
});
17311738

17321739
if (!taskRun) {
1733-
this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED/FINISHED task run not found", {
1734-
runId,
1735-
snapshotId,
1736-
});
1740+
this.logger.error(
1741+
"RunEngine.handleRepairSnapshot SUSPENDED/FINISHED task run not found",
1742+
{
1743+
runId,
1744+
snapshotId,
1745+
}
1746+
);
17371747
return;
17381748
}
17391749

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1706,6 +1706,14 @@ export class RunQueue {
17061706
const message = await this.#dequeueMessageFromKey(messageKey);
17071707

17081708
if (!message) {
1709+
this.logger.error("Failed to dequeue message from worker queue", {
1710+
messageKey,
1711+
workerQueue,
1712+
workerQueueKey,
1713+
workerQueueLength,
1714+
service: this.name,
1715+
});
1716+
17091717
return;
17101718
}
17111719

0 commit comments

Comments
 (0)