Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ OPENAI_VISION_MODEL=qwen/qwen3-vl-32b-instruct
# Use a non-thinking model to avoid JSON truncation issues with reasoning models
# BAML_LLM_MODEL=deepseek/deepseek-v3.2
EMBEDDING_DIMENSIONS=2560
# Override embedding dimensions for crawler search (defaults to EMBEDDING_DIMENSIONS)
CRAWLER_EMBEDDING_DIMENSIONS=1536

# ============================================================================
# OPTIONAL: Database Configuration
Expand Down
6 changes: 5 additions & 1 deletion compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

services:
# ============================================================================
# Tale DB (TimescaleDB)
# Tale DB (ParadeDB — pg_search + pgvector)
# ============================================================================
db:
# Image from GHCR (used when PULL_POLICY=always)
Expand Down Expand Up @@ -148,6 +148,10 @@ services:
# cpus: '1'
# memory: 2G

# Dependencies
depends_on:
- db

Comment on lines +151 to +154
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider using health-based dependency for reliable startup.

The crawler now requires the database for pg_store_manager initialization. Using depends_on: - db only ensures the container starts, not that PostgreSQL is ready to accept connections. This may cause connection errors during crawler startup.

Proposed fix to wait for healthy database
     # Dependencies
     depends_on:
-      - db
+      db:
+        condition: service_healthy
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Dependencies
depends_on:
- db
# Dependencies
depends_on:
db:
condition: service_healthy
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@compose.yml` around lines 151 - 154, Replace the simple depends_on: - db with
a health-based dependency so the crawler waits until PostgreSQL is ready; add a
Docker healthcheck to the db service (or ensure it exists) and change the
crawler's depends_on entry to use condition: service_healthy (refer to the
depends_on key and the db service) so pg_store_manager initialization won't
attempt connections before the DB is accepting connections.

# Volume mounts
# Persist crawler data (website registry + per-site URL databases)
volumes:
Expand Down
27 changes: 27 additions & 0 deletions services/crawler/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ class Settings(BaseSettings):
# Concurrency for Vision processing
vision_max_concurrent_pages: int = 3

# Database configuration
database_url: str | None = None

# Embedding model configuration
openai_embedding_model: str | None = None
embedding_dimensions: int | None = None

model_config = SettingsConfigDict(
env_prefix="CRAWLER_",
env_file=".env",
Expand Down Expand Up @@ -76,6 +83,26 @@ def get_fast_model(self) -> str:
raise ValueError("OPENAI_FAST_MODEL must be set in environment.")
return model

def get_embedding_model(self) -> str:
"""Get embedding model from CRAWLER_OPENAI_EMBEDDING_MODEL or OPENAI_EMBEDDING_MODEL."""
model = get_first_model(self.openai_embedding_model) or get_first_model(
os.environ.get("OPENAI_EMBEDDING_MODEL")
)
if not model:
raise ValueError("OPENAI_EMBEDDING_MODEL must be set in environment.")
return model

def get_embedding_dimensions(self) -> int:
"""Get embedding dimensions from CRAWLER_EMBEDDING_DIMENSIONS or EMBEDDING_DIMENSIONS."""
dims = self.embedding_dimensions
if dims is None:
raw = os.environ.get("EMBEDDING_DIMENSIONS")
if raw is not None:
dims = int(raw)
if dims is None:
raise ValueError("EMBEDDING_DIMENSIONS must be set in environment.")
return dims
Comment on lines +95 to +104
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate embedding dimensions as a positive integer at config boundary.

Line 101 currently accepts non-positive values (e.g., 0, -1), which are invalid for embedding/vector workflows and fail late downstream.

Suggested hardening patch
 def get_embedding_dimensions(self) -> int:
     """Get embedding dimensions from CRAWLER_EMBEDDING_DIMENSIONS or EMBEDDING_DIMENSIONS."""
     dims = self.embedding_dimensions
     if dims is None:
         raw = os.environ.get("EMBEDDING_DIMENSIONS")
         if raw is not None:
-            dims = int(raw)
+            try:
+                dims = int(raw)
+            except ValueError as exc:
+                raise ValueError("EMBEDDING_DIMENSIONS must be an integer.") from exc
-    if dims is None:
+    if dims is None:
         raise ValueError("EMBEDDING_DIMENSIONS must be set in environment.")
+    if dims <= 0:
+        raise ValueError("EMBEDDING_DIMENSIONS must be a positive integer.")
     return dims
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@services/crawler/app/config.py` around lines 95 - 104, The
get_embedding_dimensions method currently permits non-positive integers; update
get_embedding_dimensions (and its use of self.embedding_dimensions and the
EMBEDDING_DIMENSIONS env var) to validate that the resolved value is an integer
> 0: when reading from self.embedding_dimensions accept only positive ints, when
parsing os.environ["EMBEDDING_DIMENSIONS"] convert with int inside a try/except
and raise a clear ValueError for non-integer input, and if the value is <= 0
raise ValueError("EMBEDDING_DIMENSIONS must be a positive integer") so invalid
values fail fast at config boundary.



# Global settings instance
settings = Settings()
47 changes: 37 additions & 10 deletions services/crawler/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

Independent web crawling service using Crawl4AI.
Provides REST API for website crawling, URL discovery, document conversion,
template generation, and file parsing.
template generation, file parsing, content indexing, and hybrid search.

This module follows Clean Architecture principles:
- main.py: Application setup, configuration, and router registration
Expand All @@ -27,16 +27,17 @@
crawler_router,
docx_router,
image_router,
index_router,
pages_router,
pdf_router,
pptx_router,
search_router,
web_router,
websites_router,
)
from app.services.crawler_service import get_crawler_service
from app.services.image_service import get_image_service
from app.services.pdf_service import get_pdf_service
from app.services.scheduler import run_scheduler
from app.services.website_store import get_website_store_manager


@asynccontextmanager
Expand All @@ -54,28 +55,51 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
logger.info("Crawler service initialized successfully")
except Exception:
logger.exception("Failed to initialize crawler service")
# Don't fail startup - allow lazy initialization

# Initialize PostgreSQL connection pool + search services
from app.services.database import close_pool, init_pool
from app.services.embedding_service import get_embedding_service
from app.services.indexing_service import IndexingService
from app.services.pg_website_store import PgWebsiteStoreManager
from app.services.scheduler import run_scheduler
from app.services.search_service import SearchService

pool = await init_pool()
pg_store_manager = PgWebsiteStoreManager(pool)
embedding_service = get_embedding_service()
indexing_service = IndexingService(pool, embedding_service)
search_service = SearchService(pool, embedding_service)

# Wire services into routers
from app.routers.index import set_indexing_service
from app.routers.search import set_search_service

set_search_service(search_service)
set_indexing_service(indexing_service)

# Store references for scheduler and other routers
app.state.pg_store_manager = pg_store_manager
app.state.indexing_service = indexing_service

logger.info("PostgreSQL pool + search services initialized")

# Start background scheduler
store_manager = get_website_store_manager()
scheduler_task = asyncio.create_task(run_scheduler(store_manager, get_crawler_service()))
scheduler_task = asyncio.create_task(run_scheduler(pg_store_manager, get_crawler_service(), indexing_service))
logger.info("Background scheduler started")

yield

# Shutdown
logger.info("Shutting down Tale Crawler service...")

# Stop scheduler
scheduler_task.cancel()
with suppress(asyncio.CancelledError):
await scheduler_task
logger.info("Scheduler stopped")

# Close all website stores
store_manager.close_all()
await pg_store_manager.close()
await close_pool()

# Cleanup crawler service
try:
crawler = get_crawler_service()
if crawler.initialized:
Expand Down Expand Up @@ -109,6 +133,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
# Register routers
app.include_router(crawler_router)
app.include_router(websites_router)
app.include_router(search_router)
app.include_router(pages_router)
app.include_router(index_router)
app.include_router(pdf_router)
app.include_router(image_router)
app.include_router(docx_router)
Expand Down
131 changes: 130 additions & 1 deletion services/crawler/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
"""

from typing import Any, Literal
from urllib.parse import urlparse

from pydantic import BaseModel, Field, HttpUrl
from pydantic import BaseModel, Field, HttpUrl, field_validator

# Valid Playwright wait_until values
WaitUntilType = Literal["load", "domcontentloaded", "networkidle", "commit"]
Expand All @@ -27,6 +28,30 @@ class RegisterWebsiteRequest(BaseModel):
domain: str = Field(..., description="The domain to register (e.g., 'docs.example.com')")
scan_interval: int = Field(21600, description="Scan interval in seconds (default: 6h)", ge=60)

@field_validator("domain")
@classmethod
def normalize_domain(cls, v: str) -> str:
"""Strip protocol/path — store bare hostname only."""
if "://" in v:
return urlparse(v).hostname or v
return v


class WebsiteInfoResponse(BaseModel):
"""Full website information."""

domain: str
title: str | None = None
description: str | None = None
page_count: int = 0
crawled_count: int = 0
status: str = "idle"
scan_interval: int = 21600
last_scanned_at: str | None = None
error: str | None = None
created_at: str | None = None
updated_at: str | None = None


class WebsiteUrl(BaseModel):
"""A tracked URL with content hash."""
Expand Down Expand Up @@ -284,3 +309,107 @@ class WebFetchExtractResponse(BaseModel):
page_count: int = Field(..., description="Number of pages in PDF")
vision_used: bool = Field(False, description="Whether Vision API was used for extraction")
error: str | None = Field(None, description="Error message if operation failed")


# ==================== Search Models ====================


class SearchRequest(BaseModel):
"""Request for hybrid search."""

query: str = Field(..., description="Search query")
limit: int = Field(10, ge=1, le=100, description="Maximum results")


class SearchResultItem(BaseModel):
"""A single search result."""

url: str
title: str | None = None
chunk_content: str
chunk_index: int
score: float


class SearchResponse(BaseModel):
"""Response from search endpoint."""

query: str
results: list[SearchResultItem] = Field(default_factory=list)
total: int


# ==================== Pages List Models ====================


class PageListItem(BaseModel):
"""A page in the pages list."""

url: str
title: str | None = None
word_count: int = 0
status: str = "discovered"
content_hash: str | None = None
last_crawled_at: str | None = None
discovered_at: str | None = None
chunks_count: int = 0
indexed: bool = False


class PageListResponse(BaseModel):
"""Paginated response of pages for a website."""

domain: str
pages: list[PageListItem] = Field(default_factory=list)
total: int = 0
offset: int = 0
has_more: bool = False


class PageChunkItem(BaseModel):
"""A single chunk from a page."""

chunk_index: int
chunk_content: str


class PageChunksResponse(BaseModel):
"""Response containing all chunks for a specific page."""

url: str
domain: str
chunks: list[PageChunkItem] = Field(default_factory=list)
total: int = 0


# ==================== Indexing Models ====================


class IndexPageRequest(BaseModel):
"""Request to index a single page."""

domain: str = Field(..., description="Website domain")
url: str = Field(..., description="Page URL")
title: str | None = Field(None, description="Page title")
content: str = Field(..., description="Page content to index")


class IndexPageResponse(BaseModel):
"""Response from indexing a single page."""

success: bool
url: str
chunks_indexed: int
status: str
error: str | None = None


class IndexWebsiteResponse(BaseModel):
"""Response from indexing all pages for a website."""

success: bool
domain: str
pages_indexed: int
pages_skipped: int
pages_failed: int
total_chunks: int
9 changes: 9 additions & 0 deletions services/crawler/app/routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
This package contains modular routers following Clean Architecture principles:
- crawler: Content fetching and URL check endpoints (/api/v1/urls)
- websites: Website registration and URL listing (/api/v1/websites)
- search: Hybrid full-text + vector search (/api/v1/search)
- pages: List indexed pages per website (/api/v1/pages)
- index: Content indexing management (/api/v1/index)
- pdf: PDF conversion and parsing (/api/v1/pdf)
- image: Image conversion (/api/v1/images)
- docx: DOCX document generation and parsing (/api/v1/docx)
Expand All @@ -14,17 +17,23 @@
from app.routers.crawler import router as crawler_router
from app.routers.docx import router as docx_router
from app.routers.image import router as image_router
from app.routers.index import router as index_router
from app.routers.pages import router as pages_router
from app.routers.pdf import router as pdf_router
from app.routers.pptx import router as pptx_router
from app.routers.search import router as search_router
from app.routers.web import router as web_router
from app.routers.websites import router as websites_router

__all__ = [
"crawler_router",
"docx_router",
"image_router",
"index_router",
"pages_router",
"pdf_router",
"pptx_router",
"search_router",
"web_router",
"websites_router",
]
9 changes: 4 additions & 5 deletions services/crawler/app/routers/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from typing import Annotated

from fastapi import APIRouter, HTTPException, Query
from fastapi import APIRouter, HTTPException, Query, Request
from loguru import logger
from pydantic import HttpUrl

Expand All @@ -14,22 +14,21 @@
PageContent,
)
from app.services.crawler_service import get_crawler_service
from app.services.website_store import get_website_store_manager

router = APIRouter(prefix="/api/v1/urls", tags=["Crawler"])


@router.post("/fetch", response_model=FetchUrlsResponse)
async def fetch_urls(request: FetchUrlsRequest):
async def fetch_urls(request: FetchUrlsRequest, http_request: Request):
"""
Fetch content from a list of specific URLs.

Returns cached content when available from the per-site content store,
falling back to live crawling for cache misses.
"""
try:
store_manager = get_website_store_manager()
cached, urls_to_crawl = store_manager.get_cached_pages(request.urls)
store_manager = http_request.app.state.pg_store_manager
cached, urls_to_crawl = await store_manager.get_cached_pages(request.urls)

# Filter cached pages by word_count_threshold
threshold = request.word_count_threshold
Expand Down
Loading
Loading