Skip to content

feat: production hardening — watcher, processor, ingest API#1

Merged
lacymorrow merged 1 commit intomainfrom
feature/production-hardening
Mar 16, 2026
Merged

feat: production hardening — watcher, processor, ingest API#1
lacymorrow merged 1 commit intomainfrom
feature/production-hardening

Conversation

@lacymorrow
Copy link
Owner

Production Hardening

Watcher (src/pipeline/watcher.py)

  • File locking — prevents concurrent processing of the same file
  • DB-based dedup — skips files already processed (by hash)
  • Crash recovery — cleans stale lock files from dead processes on startup
  • Retry with backoff — 2 attempts per file, then marks as .failed
  • Stale chunk cleanup — removes orphaned chunk dirs older than 24h
  • Health file — writes data/.watcher_healthy for container liveness probes
  • GPU memory cleanuptorch.cuda.empty_cache() + gc.collect() between files
  • Graceful shutdown — catches SIGTERM/SIGINT, finishes current file, exits clean

Processor (src/pipeline/processor.py)

  • Stage timeouts — VAD 120s, transcribe 600s, diarize 600s, embed 120s, extract 60s
  • Input validation — file exists, non-empty, min 100 bytes
  • Duration limits — rejects audio < 1s or > 2 hours
  • GPU cleanup between stages — prevents OOM on long processing runs

Ingest API (src/api/ingest.py)

  • Deep health checkGET /ingest/health checks DB connectivity + disk space
  • Request logging — middleware logs method, path, status, timing for every request
  • File size limits — 500MB max upload, 256KB max chunk
  • Uptime + disk % in status endpoint

Watcher:
- File locking to prevent concurrent processing
- DB-based dedup (skip already-processed hashes)
- Stale chunk cleanup (24h threshold)
- Crash recovery (dead process lock cleanup on startup)
- Health file for container liveness probes
- Retry with backoff, dead-letter marking after 2 attempts
- GPU memory cleanup between files
- Graceful shutdown on SIGTERM/SIGINT
- Interruptible sleep for fast shutdown

Processor:
- Stage timeouts (VAD 120s, transcribe 600s, diarize 600s, embed 120s, extract 60s)
- Input validation (file exists, non-empty, min size)
- Audio duration limits (1s min, 2h max)
- GPU memory cleanup between every stage
- Timeout wrapper using ThreadPoolExecutor

Ingest API:
- Deep health endpoint (/ingest/health) with DB + disk checks
- Request logging middleware with timing
- File size limits (500MB upload, 256KB chunk)
- Disk usage percentage in status
- Uptime tracking
- Structured 413 responses for oversized uploads
@gemini-code-assist
Copy link

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the production readiness and robustness of the system's core components: the Ingest API, the Watcher, and the Processor. The changes focus on improving reliability, resource management, and operational visibility. Key improvements include mechanisms for handling concurrent operations, preventing resource exhaustion, ensuring data integrity through deduplication and validation, and providing better insights into system health and performance. These updates are crucial for maintaining stability and efficiency in a production environment.

Highlights

  • Watcher Hardening: Implemented file locking to prevent concurrent processing, DB-based deduplication to skip already processed files, crash recovery for stale lock files, retry mechanism with backoff for transient failures, cleanup of stale chunk directories, a health file for liveness probes, GPU memory cleanup between files, and graceful shutdown.
  • Processor Hardening: Introduced stage-specific timeouts for VAD, transcription, diarization, embedding, and extraction to prevent hanging. Added input validation for audio files (existence, non-empty, minimum size) and duration limits (1 second to 2 hours). Implemented GPU memory cleanup between processing stages to prevent out-of-memory errors.
  • Ingest API Hardening: Added a deep health check endpoint (/ingest/health) to verify database connectivity and disk space. Implemented request logging middleware to track method, path, status, and timing for all incoming requests. Enforced file size limits for uploads (500MB max) and individual chunks (256KB max). The status endpoint now includes disk usage percentage and API uptime.
Changelog
  • src/api/ingest.py
    • Added /ingest/health endpoint for deep health checks.
    • Introduced MAX_UPLOAD_SIZE and MAX_CHUNK_SIZE constants for file limits.
    • Implemented log_requests middleware for comprehensive request logging.
    • Integrated disk usage percentage and uptime into the /ingest/status endpoint.
    • Added file size validation to the ingest_upload endpoint.
    • Added chunk size validation to the ingest_chunk endpoint.
  • src/pipeline/processor.py
    • Added gc and ThreadPoolExecutor imports for memory management and timeouts.
    • Defined STAGE_TIMEOUTS, MAX_AUDIO_DURATION, and MIN_AUDIO_DURATION constants.
    • Implemented _run_with_timeout function to apply timeouts to processing stages.
    • Added _gpu_cleanup function to release GPU memory between stages.
    • Introduced input validation for audio files (existence, size) before processing.
    • Applied stage timeouts to VAD, transcription, diarization, embedding, and extraction.
    • Added audio duration validation to ensure files are within acceptable length limits.
  • src/pipeline/watcher.py
    • Added imports for gc, os, signal, sys, datetime, and timezone.
    • Defined constants for lock files, failed files, health file, max retries, and stale chunk hours.
    • Implemented _file_hash_quick for fast file deduplication checks.
    • Added file locking mechanisms (_is_locked, _lock, _unlock) to prevent concurrent processing.
    • Introduced _is_failed and _mark_failed functions for handling permanently failed files.
    • Integrated _is_db_processed for database-based deduplication.
    • Added _clean_stale_locks for crash recovery on startup.
    • Implemented _clean_stale_chunks for periodic cleanup of orphaned chunk directories.
    • Added _write_health to update a health file for container liveness probes.
    • Introduced _gpu_cleanup to free GPU memory after each file processing.
    • Modified find_unprocessed to account for locked, failed, and DB-processed files.
    • Updated process_batch to utilize file locking and GPU cleanup.
    • Enhanced the watch function with graceful shutdown, retry logic, periodic maintenance, and health reporting.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant production hardening features across the watcher, processor, and ingest API, including file locking, timeouts, graceful shutdowns, and health checks, greatly improving the robustness and reliability of the pipeline. However, the new health check endpoint may expose sensitive system or database details through raw exception messages, and the file upload size limit is enforced only after the entire file is read into memory, which could lead to memory exhaustion attacks. Further enhancements are suggested around resource management (database connections, memory usage during uploads) and the stale lock cleanup mechanism to improve overall robustness and security.

Comment on lines 281 to +286
content = await file.read()
if len(content) > MAX_CHUNK_SIZE:
raise HTTPException(
status_code=413,
detail=f"Chunk too large ({len(content)} bytes, max {MAX_CHUNK_SIZE})"
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the simple upload endpoint, this reads the entire chunk into memory before validation. While chunks are expected to be small, a malicious client could send a very large file here, leading to a potential denial-of-service (DoS) from memory exhaustion. It's safer to read the content in chunks and validate the size during streaming.

Suggested change
content = await file.read()
if len(content) > MAX_CHUNK_SIZE:
raise HTTPException(
status_code=413,
detail=f"Chunk too large ({len(content)} bytes, max {MAX_CHUNK_SIZE})"
)
content = bytearray()
while chunk := await file.read(8192):
content.extend(chunk)
if len(content) > MAX_CHUNK_SIZE:
raise HTTPException(
status_code=413,
detail=f"Chunk too large (max {MAX_CHUNK_SIZE} bytes)",
)

Comment on lines +104 to +114
checks["disk"] = f"error: {e}"

# DB check
try:
from src.db.engine import get_session
session = get_session()
session.execute("SELECT 1")
session.close()
checks["database"] = "ok"
except Exception as e:
checks["database"] = f"error: {e}"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The health check endpoint currently returns raw exception messages from disk usage and database connection attempts, which can leak sensitive system information. Additionally, the database session is not guaranteed to be closed if an exception occurs, potentially leading to connection leaks. It is recommended to log full exceptions for internal debugging, return generic error messages to the user, and ensure database sessions are always closed using a try...finally block.

Suggested change
checks["disk"] = f"error: {e}"
# DB check
try:
from src.db.engine import get_session
session = get_session()
session.execute("SELECT 1")
session.close()
checks["database"] = "ok"
except Exception as e:
checks["database"] = f"error: {e}"
session = get_session()
try:
session.execute("SELECT 1")
checks["database"] = "ok"
finally:
session.close()

Comment on lines +191 to +195
if len(content) > MAX_UPLOAD_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large ({len(content)} bytes, max {MAX_UPLOAD_SIZE})"
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The file size limit check is performed after the entire file has been read into memory using await file.read(). This creates a potential Denial of Service (DoS) vulnerability via Out-of-Memory (OOM) crashes for very large uploads. It is safer to validate the Content-Length header first or read the file in small chunks to enforce the limit without exhausting system memory.

Suggested change
if len(content) > MAX_UPLOAD_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large ({len(content)} bytes, max {MAX_UPLOAD_SIZE})"
)
content = bytearray()
while chunk := await file.read(8192): # Read in chunks
content.extend(chunk)
if len(content) > MAX_UPLOAD_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large (max {MAX_UPLOAD_SIZE} bytes)"
)
if not content:
raise HTTPException(status_code=400, detail="Empty file")

Comment on lines +67 to +76
def _run_with_timeout(fn, timeout_seconds: int, stage_name: str):
"""Run a function with a timeout. Raises TimeoutError if exceeded."""
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(fn)
try:
return future.result(timeout=timeout_seconds)
except FutureTimeout:
raise TimeoutError(
f"Stage '{stage_name}' timed out after {timeout_seconds}s"
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _run_with_timeout function creates a new ThreadPoolExecutor for every call. This is inefficient as creating and tearing down thread pools has overhead. For better performance, consider creating a single ThreadPoolExecutor instance at the module level and reusing it across all calls to this function.

For example:

# At module level
_executor = ThreadPoolExecutor(max_workers=1)

def _run_with_timeout(fn, timeout_seconds: int, stage_name: str):
    """Run a function with a timeout. Raises TimeoutError if exceeded."""
    future = _executor.submit(fn)
    try:
        return future.result(timeout=timeout_seconds)
    except FutureTimeout:
        raise TimeoutError(
            f"Stage '{stage_name}' timed out after {timeout_seconds}s"
        )

Comment on lines +173 to +176
if audio_path.stat().st_size == 0:
raise ValueError(f"Audio file is empty: {audio_path}")
if audio_path.stat().st_size < 100:
raise ValueError(f"Audio file too small ({audio_path.stat().st_size} bytes): {audio_path}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The file stats are retrieved multiple times using audio_path.stat() in this block and later for logging on line 183. To improve readability and avoid redundant system calls, it's better to call stat() once and store the result in a variable for reuse.

Suggested change
if audio_path.stat().st_size == 0:
raise ValueError(f"Audio file is empty: {audio_path}")
if audio_path.stat().st_size < 100:
raise ValueError(f"Audio file too small ({audio_path.stat().st_size} bytes): {audio_path}")
file_stat = audio_path.stat()
if file_stat.st_size == 0:
raise ValueError(f"Audio file is empty: {audio_path}")
if file_stat.st_size < 100:
raise ValueError(f"Audio file too small ({file_stat.st_size} bytes): {audio_path}")

Comment on lines +95 to +109
def _clean_stale_locks(upload_dir: Path):
"""Remove lock files from dead processes (crash recovery)."""
for lock in upload_dir.glob(f"*{LOCK_SUFFIX}"):
try:
content = lock.read_text().strip().split("\n")
pid = int(content[0])
# Check if process is still alive
try:
os.kill(pid, 0)
except OSError:
logger.warning(f"Cleaning stale lock: {lock.name} (pid {pid} dead)")
lock.unlink()
except (ValueError, IndexError):
# Malformed lock file, remove it
lock.unlink()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation for cleaning stale locks relies solely on checking if the PID from the lock file is active. This is not fully robust, as PIDs can be recycled by the operating system. A long-running stale lock could be ignored if its PID is reused by a new process.

To make this more robust, you could also check the age of the lock file. The lock file already contains a timestamp. You can parse this timestamp and if the lock is older than a reasonable threshold (e.g., STALE_CHUNK_HOURS), it can be considered stale and removed. This would provide a second layer of protection against orphaned locks.

@lacymorrow lacymorrow merged commit 3265890 into main Mar 16, 2026
1 of 3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant