Skip to content

Commit f4f86e3

Browse files
authored
fix: deadlock in worker polling (#250)
* fix: deadlock in worker polling * fix: deadlock in worker polling * fixes
1 parent 728ce13 commit f4f86e3

File tree

8 files changed

+449
-129
lines changed

8 files changed

+449
-129
lines changed

hindsight-api/hindsight_api/api/http.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1404,9 +1404,10 @@ async def lifespan(app: FastAPI):
14041404
worker_id=worker_id,
14051405
executor=memory.execute_task,
14061406
poll_interval_ms=config.worker_poll_interval_ms,
1407-
batch_size=config.worker_batch_size,
14081407
max_retries=config.worker_max_retries,
14091408
tenant_extension=getattr(memory, "_tenant_extension", None),
1409+
max_slots=config.worker_max_slots,
1410+
consolidation_max_slots=config.worker_consolidation_max_slots,
14101411
)
14111412
poller_task = asyncio.create_task(poller.run())
14121413
logging.info(f"Worker poller started (worker_id={worker_id})")

hindsight-api/hindsight_api/config.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,9 @@
143143
ENV_WORKER_ID = "HINDSIGHT_API_WORKER_ID"
144144
ENV_WORKER_POLL_INTERVAL_MS = "HINDSIGHT_API_WORKER_POLL_INTERVAL_MS"
145145
ENV_WORKER_MAX_RETRIES = "HINDSIGHT_API_WORKER_MAX_RETRIES"
146-
ENV_WORKER_BATCH_SIZE = "HINDSIGHT_API_WORKER_BATCH_SIZE"
147146
ENV_WORKER_HTTP_PORT = "HINDSIGHT_API_WORKER_HTTP_PORT"
147+
ENV_WORKER_MAX_SLOTS = "HINDSIGHT_API_WORKER_MAX_SLOTS"
148+
ENV_WORKER_CONSOLIDATION_MAX_SLOTS = "HINDSIGHT_API_WORKER_CONSOLIDATION_MAX_SLOTS"
148149

149150
# Reflect agent settings
150151
ENV_REFLECT_MAX_ITERATIONS = "HINDSIGHT_API_REFLECT_MAX_ITERATIONS"
@@ -229,8 +230,9 @@
229230
DEFAULT_WORKER_ID = None # Will use hostname if not specified
230231
DEFAULT_WORKER_POLL_INTERVAL_MS = 500 # Poll database every 500ms
231232
DEFAULT_WORKER_MAX_RETRIES = 3 # Max retries before marking task failed
232-
DEFAULT_WORKER_BATCH_SIZE = 10 # Tasks to claim per poll cycle
233233
DEFAULT_WORKER_HTTP_PORT = 8889 # HTTP port for worker metrics/health
234+
DEFAULT_WORKER_MAX_SLOTS = 10 # Total concurrent tasks per worker
235+
DEFAULT_WORKER_CONSOLIDATION_MAX_SLOTS = 2 # Max concurrent consolidation tasks per worker
234236

235237
# Reflect agent settings
236238
DEFAULT_REFLECT_MAX_ITERATIONS = 10 # Max tool call iterations before forcing response
@@ -419,8 +421,9 @@ class HindsightConfig:
419421
worker_id: str | None
420422
worker_poll_interval_ms: int
421423
worker_max_retries: int
422-
worker_batch_size: int
423424
worker_http_port: int
425+
worker_max_slots: int
426+
worker_consolidation_max_slots: int
424427

425428
# Reflect agent settings
426429
reflect_max_iterations: int
@@ -582,8 +585,11 @@ def from_env(cls) -> "HindsightConfig":
582585
worker_id=os.getenv(ENV_WORKER_ID) or DEFAULT_WORKER_ID,
583586
worker_poll_interval_ms=int(os.getenv(ENV_WORKER_POLL_INTERVAL_MS, str(DEFAULT_WORKER_POLL_INTERVAL_MS))),
584587
worker_max_retries=int(os.getenv(ENV_WORKER_MAX_RETRIES, str(DEFAULT_WORKER_MAX_RETRIES))),
585-
worker_batch_size=int(os.getenv(ENV_WORKER_BATCH_SIZE, str(DEFAULT_WORKER_BATCH_SIZE))),
586588
worker_http_port=int(os.getenv(ENV_WORKER_HTTP_PORT, str(DEFAULT_WORKER_HTTP_PORT))),
589+
worker_max_slots=int(os.getenv(ENV_WORKER_MAX_SLOTS, str(DEFAULT_WORKER_MAX_SLOTS))),
590+
worker_consolidation_max_slots=int(
591+
os.getenv(ENV_WORKER_CONSOLIDATION_MAX_SLOTS, str(DEFAULT_WORKER_CONSOLIDATION_MAX_SLOTS))
592+
),
587593
# Reflect agent settings
588594
reflect_max_iterations=int(os.getenv(ENV_REFLECT_MAX_ITERATIONS, str(DEFAULT_REFLECT_MAX_ITERATIONS))),
589595
)

hindsight-api/hindsight_api/main.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,9 @@ def release_lock():
253253
worker_id=config.worker_id,
254254
worker_poll_interval_ms=config.worker_poll_interval_ms,
255255
worker_max_retries=config.worker_max_retries,
256-
worker_batch_size=config.worker_batch_size,
257256
worker_http_port=config.worker_http_port,
257+
worker_max_slots=config.worker_max_slots,
258+
worker_consolidation_max_slots=config.worker_consolidation_max_slots,
258259
reflect_max_iterations=config.reflect_max_iterations,
259260
mental_model_refresh_concurrency=config.mental_model_refresh_concurrency,
260261
)

hindsight-api/hindsight_api/worker/main.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,6 @@ def main():
124124
default=config.worker_poll_interval_ms,
125125
help=f"Poll interval in milliseconds (default: {config.worker_poll_interval_ms}, env: HINDSIGHT_API_WORKER_POLL_INTERVAL_MS)",
126126
)
127-
parser.add_argument(
128-
"--batch-size",
129-
type=int,
130-
default=config.worker_batch_size,
131-
help=f"Tasks to claim per poll (default: {config.worker_batch_size}, env: HINDSIGHT_API_WORKER_BATCH_SIZE)",
132-
)
133127
parser.add_argument(
134128
"--max-retries",
135129
type=int,
@@ -168,8 +162,9 @@ def main():
168162

169163
print(f"Starting Hindsight Worker: {args.worker_id}")
170164
print(f" Poll interval: {args.poll_interval}ms")
171-
print(f" Batch size: {args.batch_size}")
172165
print(f" Max retries: {args.max_retries}")
166+
print(f" Max slots: {config.worker_max_slots}")
167+
print(f" Consolidation max slots: {config.worker_consolidation_max_slots}")
173168
print(f" HTTP server: {args.http_host}:{args.http_port}")
174169
print()
175170

@@ -213,9 +208,10 @@ async def run():
213208
worker_id=args.worker_id,
214209
executor=memory.execute_task,
215210
poll_interval_ms=args.poll_interval,
216-
batch_size=args.batch_size,
217211
max_retries=args.max_retries,
218212
tenant_extension=tenant_extension,
213+
max_slots=config.worker_max_slots,
214+
consolidation_max_slots=config.worker_consolidation_max_slots,
219215
)
220216

221217
# Create the HTTP app for metrics/health

0 commit comments

Comments
 (0)