Excellent! Now let's proceed to **Step 1.2: Database Layer** - setting up PostgreSQL, Redis, and Vector Store with LangGraph checkpointing.[1][2][3][4]

***

## **STEP 1.2: Database Layer Setup**

### **File 1: `backend/app/db/__init__.py`**

```python
"""Database layer for PostgreSQL, Redis, and Vector Store."""
```

***

### **File 2: `backend/app/db/postgres.py`**

```python
"""
PostgreSQL Database Connection using SQLAlchemy 2.0 Async

This module handles:
1. Async database session management
2. Connection pooling for performance
3. LangGraph checkpointer integration for agent state persistence
4. Hot reload mechanism for caching frequently accessed data

Key Concepts:
- AsyncEngine: Async-compatible database engine
- AsyncSession: Async database session for queries
- sessionmaker: Factory for creating sessions
- Connection pooling: Reuses connections for better performance
"""

from contextlib import asynccontextmanager
from typing import AsyncGenerator

from sqlalchemy import text
from sqlalchemy.ext.asyncio import (
    AsyncEngine,
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.pool import NullPool, QueuePool

from app.config import settings
from app.utils.logger import get_logger

log = get_logger(__name__)


# =============================================================================
# BASE MODEL FOR DECLARATIVE ORM
# =============================================================================
class Base(DeclarativeBase):
    """
    Base class for all SQLAlchemy models.
    
    All database models inherit from this class to get SQLAlchemy's ORM features.
    
    Example:
        class User(Base):
            __tablename__ = "users"
            id = Column(Integer, primary_key=True)
            name = Column(String)
    """
    pass


# =============================================================================
# DATABASE ENGINE CONFIGURATION
# =============================================================================
def create_database_engine() -> AsyncEngine:
    """
    Create async PostgreSQL engine with connection pooling.
    
    Connection Pool Settings:
    - pool_size: Number of connections to keep open (default: 5)
    - max_overflow: Additional connections beyond pool_size (default: 10)
    - pool_pre_ping: Check connection health before using (prevents stale connections)
    - pool_recycle: Recycle connections after N seconds (prevents timeout)
    - echo: Log all SQL statements (useful for debugging)
    
    Returns:
        AsyncEngine: Configured async database engine
    """
    
    # Development: Use NullPool for easier debugging (no connection reuse)
    # Production: Use QueuePool for better performance
    poolclass = NullPool if settings.DEBUG else QueuePool
    
    engine = create_async_engine(
        settings.DATABASE_URL,
        echo=settings.DEBUG,  # Log SQL in debug mode
        poolclass=poolclass,
        pool_size=5,  # Keep 5 connections open
        max_overflow=10,  # Allow up to 15 total connections (5 + 10)
        pool_pre_ping=True,  # Verify connections are alive
        pool_recycle=3600,  # Recycle connections after 1 hour
    )
    
    log.info(
        "Database engine created",
        pool_class=poolclass.__name__,
        pool_size=5 if poolclass == QueuePool else 0,
    )
    
    return engine


# Create global engine instance
engine = create_database_engine()

# Create session factory
# expire_on_commit=False prevents SQLAlchemy from expiring objects after commit
# This is important for async code to avoid lazy-loading issues
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autocommit=False,
    autoflush=False,
)


# =============================================================================
# DATABASE SESSION DEPENDENCY
# =============================================================================
@asynccontextmanager
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
    """
    Async context manager for database sessions.
    
    This is a dependency injection pattern for FastAPI endpoints.
    It ensures:
    1. Session is created for each request
    2. Session is automatically committed on success
    3. Session is rolled back on exception
    4. Session is always closed
    
    Usage in FastAPI:
        @app.get("/users")
        async def get_users(db: AsyncSession = Depends(get_db_session)):
            result = await db.execute(select(User))
            return result.scalars().all()
    
    Yields:
        AsyncSession: Database session for this request
    """
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception as e:
            await session.rollback()
            log.error("Database transaction failed", exc_info=e)
            raise
        finally:
            await session.close()


# =============================================================================
# DATABASE INITIALIZATION
# =============================================================================
async def init_database() -> None:
    """
    Initialize database by creating all tables.
    
    This should be called once at application startup.
    It creates tables for:
    - Conversations
    - Agent states
    - LangGraph checkpoints
    - User data
    """
    async with engine.begin() as conn:
        # Create all tables defined in Base subclasses
        await conn.run_sync(Base.metadata.create_all)
    
    log.info("Database tables created/verified")


async def check_database_connection() -> bool:
    """
    Check if database connection is working.
    
    Returns:
        bool: True if connection is healthy, False otherwise
    """
    try:
        async with engine.begin() as conn:
            result = await conn.execute(text("SELECT 1"))
            result.scalar()
        log.info("Database connection verified")
        return True
    except Exception as e:
        log.error("Database connection failed", exc_info=e)
        return False


async def close_database() -> None:
    """
    Close all database connections.
    
    This should be called at application shutdown.
    """
    await engine.dispose()
    log.info("Database connections closed")
```

***

### **File 3: `backend/app/db/models.py`**

```python
"""
SQLAlchemy Models for Database Tables

This module defines:
1. Conversation model - stores user conversations
2. AgentExecution model - stores agent execution history
3. Checkpoint model - LangGraph state checkpoints (handled by langgraph-checkpoint-postgres)

Database Schema Design:
- Conversations: Main conversation threads
- AgentExecutions: Individual agent task executions within conversations
- Relationship: One conversation has many agent executions
"""

from datetime import datetime, timezone
from typing import Optional
from uuid import uuid4

from sqlalchemy import JSON, DateTime, ForeignKey, Integer, String, Text, func
from sqlalchemy.orm import Mapped, mapped_column, relationship

from app.db.postgres import Base


# =============================================================================
# CONVERSATION MODEL
# =============================================================================
class Conversation(Base):
    """
    Stores conversation threads between users and the multi-agent system.
    
    Each conversation is a thread where:
    - User submits queries
    - Supervisor delegates to agents
    - Agents execute tasks and return results
    
    Attributes:
        id: Unique conversation identifier (UUID)
        title: Human-readable conversation title
        user_id: User who initiated the conversation (optional for Phase 1)
        created_at: Timestamp when conversation started
        updated_at: Timestamp of last activity
        metadata: Additional context (user preferences, session info, etc.)
        agent_executions: Related agent execution records
    """
    
    __tablename__ = "conversations"
    
    # Primary key using UUID for distributed systems compatibility
    id: Mapped[str] = mapped_column(
        String(36),
        primary_key=True,
        default=lambda: str(uuid4()),
        comment="Unique conversation identifier"
    )
    
    # Conversation metadata
    title: Mapped[str] = mapped_column(
        String(255),
        nullable=False,
        default="New Conversation",
        comment="Human-readable conversation title"
    )
    
    user_id: Mapped[Optional[str]] = mapped_column(
        String(36),
        nullable=True,
        comment="User who owns this conversation (optional in Phase 1)"
    )
    
    # Timestamps with automatic updates
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        nullable=False,
        comment="Conversation creation timestamp"
    )
    
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now(),
        nullable=False,
        comment="Last update timestamp"
    )
    
    # JSON metadata for flexible storage
    metadata: Mapped[Optional[dict]] = mapped_column(
        JSON,
        nullable=True,
        comment="Additional conversation context (preferences, tags, etc.)"
    )
    
    # Relationship: One conversation has many agent executions
    agent_executions: Mapped[list["AgentExecution"]] = relationship(
        "AgentExecution",
        back_populates="conversation",
        cascade="all, delete-orphan",  # Delete executions when conversation is deleted
    )
    
    def __repr__(self) -> str:
        return f"<Conversation(id={self.id}, title={self.title})>"


# =============================================================================
# AGENT EXECUTION MODEL
# =============================================================================
class AgentExecution(Base):
    """
    Stores individual agent task executions within a conversation.
    
    Each execution represents:
    - Which agent was invoked
    - Input provided to the agent
    - Output/result from the agent
    - Execution metadata (duration, tokens used, etc.)
    
    This enables:
    - Conversation history replay
    - Agent performance monitoring
    - Debugging agent behavior
    - Cost tracking (LLM token usage)
    """
    
    __tablename__ = "agent_executions"
    
    # Primary key
    id: Mapped[int] = mapped_column(
        Integer,
        primary_key=True,
        autoincrement=True,
        comment="Unique execution identifier"
    )
    
    # Foreign key to conversation
    conversation_id: Mapped[str] = mapped_column(
        String(36),
        ForeignKey("conversations.id", ondelete="CASCADE"),
        nullable=False,
        index=True,  # Index for faster joins
        comment="Parent conversation ID"
    )
    
    # Agent information
    agent_name: Mapped[str] = mapped_column(
        String(100),
        nullable=False,
        index=True,  # Index for querying by agent
        comment="Name of the agent that executed this task"
    )
    
    agent_type: Mapped[str] = mapped_column(
        String(50),
        nullable=False,
        comment="Type of agent (supervisor, worker, etc.)"
    )
    
    # Execution data
    input_data: Mapped[dict] = mapped_column(
        JSON,
        nullable=False,
        comment="Input provided to the agent"
    )
    
    output_data: Mapped[Optional[dict]] = mapped_column(
        JSON,
        nullable=True,
        comment="Output/result from the agent"
    )
    
    # Status tracking
    status: Mapped[str] = mapped_column(
        String(20),
        nullable=False,
        default="pending",
        comment="Execution status: pending, running, completed, failed"
    )
    
    error_message: Mapped[Optional[str]] = mapped_column(
        Text,
        nullable=True,
        comment="Error message if execution failed"
    )
    
    # Timestamps
    started_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        nullable=False,
        comment="Execution start time"
    )
    
    completed_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True),
        nullable=True,
        comment="Execution completion time"
    )
    
    # Performance metrics
    duration_ms: Mapped[Optional[int]] = mapped_column(
        Integer,
        nullable=True,
        comment="Execution duration in milliseconds"
    )
    
    tokens_used: Mapped[Optional[int]] = mapped_column(
        Integer,
        nullable=True,
        comment="Number of LLM tokens consumed"
    )
    
    # Metadata
    metadata: Mapped[Optional[dict]] = mapped_column(
        JSON,
        nullable=True,
        comment="Additional execution metadata (model used, temperature, etc.)"
    )
    
    # Relationship: Many executions belong to one conversation
    conversation: Mapped["Conversation"] = relationship(
        "Conversation",
        back_populates="agent_executions"
    )
    
    def __repr__(self) -> str:
        return f"<AgentExecution(id={self.id}, agent={self.agent_name}, status={self.status})>"


# =============================================================================
# NOTES ON LANGGRAPH CHECKPOINTING
# =============================================================================
"""
LangGraph Checkpointing Tables:

The langgraph-checkpoint-postgres package automatically creates these tables:
1. checkpoints - Stores graph execution state
2. checkpoint_blobs - Stores large binary data (embeddings, etc.)
3. checkpoint_writes - Stores pending writes

These tables enable:
- Agent state persistence across requests
- Conversation memory (agents remember previous interactions)
- Error recovery (resume from last checkpoint)
- Time-travel debugging (inspect state at any point)

We don't need to define these models manually - they're handled by:
    from langgraph.checkpoint.postgres import PostgresSaver
    
    checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
    await checkpointer.setup()  # Creates tables automatically

See implementation in app/graphs/supervisor_graph.py
"""
```

***

### **File 4: `backend/app/db/redis_cache.py`**

```python
"""
Redis Cache Manager with Hot Reload Mechanism

This module provides:
1. Redis connection management
2. Cache operations (get, set, delete)
3. Hot reload mechanism (periodically sync from PostgreSQL to local cache)
4. Fallback to database when Redis is unavailable

Cache Strategy:
- Frequently accessed data is cached in Redis
- Local in-memory fallback for development
- Automatic cache invalidation after TTL
- Hot reload refreshes cache from PostgreSQL every N minutes
"""

import asyncio
import json
from typing import Any, Optional

import redis.asyncio as redis
from redis.asyncio import Redis

from app.config import settings
from app.utils.logger import get_logger

log = get_logger(__name__)


# =============================================================================
# REDIS CONNECTION MANAGEMENT
# =============================================================================
class RedisCache:
    """
    Redis cache manager with async operations and hot reload.
    
    Features:
    - Async Redis operations
    - Connection pooling
    - Automatic reconnection
    - Fallback to in-memory cache if Redis unavailable
    - Hot reload mechanism from PostgreSQL
    """
    
    def __init__(self):
        self.redis_client: Optional[Redis] = None
        self._local_cache: dict[str, Any] = {}  # Fallback in-memory cache
        self._hot_reload_task: Optional[asyncio.Task] = None
    
    async def connect(self) -> None:
        """
        Connect to Redis server.
        
        If connection fails, logs error and falls back to in-memory cache.
        """
        if not settings.ENABLE_CACHE:
            log.info("Redis caching disabled")
            return
        
        try:
            self.redis_client = redis.from_url(
                settings.REDIS_URL,
                encoding="utf-8",
                decode_responses=True,
                max_connections=10,  # Connection pool size
            )
            
            # Test connection
            await self.redis_client.ping()
            log.info("Redis connection established", url=settings.REDIS_HOST)
            
            # Start hot reload task
            if settings.CACHE_HOT_RELOAD_INTERVAL > 0:
                self._hot_reload_task = asyncio.create_task(self._hot_reload_loop())
                log.info("Hot reload mechanism started", interval=settings.CACHE_HOT_RELOAD_INTERVAL)
        
        except Exception as e:
            log.warning(
                "Redis connection failed, using in-memory cache",
                exc_info=e
            )
            self.redis_client = None
    
    async def disconnect(self) -> None:
        """Close Redis connection and stop hot reload."""
        if self._hot_reload_task:
            self._hot_reload_task.cancel()
            try:
                await self._hot_reload_task
            except asyncio.CancelledError:
                pass
        
        if self.redis_client:
            await self.redis_client.close()
            log.info("Redis connection closed")
    
    # =========================================================================
    # CACHE OPERATIONS
    # =========================================================================
    
    async def get(self, key: str) -> Optional[Any]:
        """
        Get value from cache.
        
        Args:
            key: Cache key
        
        Returns:
            Cached value or None if not found
        """
        try:
            if self.redis_client:
                value = await self.redis_client.get(key)
                if value:
                    return json.loads(value)
            else:
                # Fallback to in-memory cache
                return self._local_cache.get(key)
        except Exception as e:
            log.warning("Cache get failed", key=key, exc_info=e)
        
        return None
    
    async def set(
        self,
        key: str,
        value: Any,
        ttl: Optional[int] = None
    ) -> bool:
        """
        Set value in cache.
        
        Args:
            key: Cache key
            value: Value to cache (must be JSON serializable)
            ttl: Time-to-live in seconds (default: from settings)
        
        Returns:
            True if successful, False otherwise
        """
        ttl = ttl or settings.CACHE_TTL
        
        try:
            serialized = json.dumps(value)
            
            if self.redis_client:
                await self.redis_client.setex(key, ttl, serialized)
            else:
                # Fallback to in-memory cache (no TTL in fallback)
                self._local_cache[key] = value
            
            return True
        
        except Exception as e:
            log.warning("Cache set failed", key=key, exc_info=e)
            return False
    
    async def delete(self, key: str) -> bool:
        """
        Delete value from cache.
        
        Args:
            key: Cache key
        
        Returns:
            True if successful, False otherwise
        """
        try:
            if self.redis_client:
                await self.redis_client.delete(key)
            else:
                self._local_cache.pop(key, None)
            
            return True
        
        except Exception as e:
            log.warning("Cache delete failed", key=key, exc_info=e)
            return False
    
    async def clear(self) -> bool:
        """
        Clear all cache entries.
        
        Returns:
            True if successful, False otherwise
        """
        try:
            if self.redis_client:
                await self.redis_client.flushdb()
            else:
                self._local_cache.clear()
            
            log.info("Cache cleared")
            return True
        
        except Exception as e:
            log.error("Cache clear failed", exc_info=e)
            return False
    
    # =========================================================================
    # HOT RELOAD MECHANISM
    # =========================================================================
    
    async def _hot_reload_loop(self) -> None:
        """
        Background task to periodically reload cache from PostgreSQL.
        
        This ensures cache stays fresh with database changes made by other services.
        """
        while True:
            try:
                await asyncio.sleep(settings.CACHE_HOT_RELOAD_INTERVAL)
                await self._reload_cache()
            except asyncio.CancelledError:
                break
            except Exception as e:
                log.error("Hot reload failed", exc_info=e)
    
    async def _reload_cache(self) -> None:
        """
        Reload frequently accessed data from PostgreSQL to cache.
        
        TODO: In Step 1.4, this will reload:
        - Recent conversation summaries
        - Agent performance metrics
        - User preferences
        """
        log.debug("Hot reload triggered")
        
        # TODO: Implement actual reload logic in Step 1.4
        # Example:
        # async with get_db_session() as db:
        #     conversations = await db.execute(select(Conversation).limit(100))
        #     for conv in conversations.scalars():
        #         await self.set(f"conv:{conv.id}", conv.dict())


# =============================================================================
# GLOBAL CACHE INSTANCE
# =============================================================================
cache = RedisCache()


async def init_cache() -> None:
    """Initialize Redis cache. Call at application startup."""
    await cache.connect()


async def close_cache() -> None:
    """Close Redis cache. Call at application shutdown."""
    await cache.disconnect()
```

***

### **File 5: `backend/app/db/vector_store.py`**

```python
"""
Vector Store Integration for Semantic Search

This module provides vector database integration for:
1. Storing conversation embeddings for semantic search
2. Agent memory - finding relevant past conversations
3. Document embeddings for RAG (Retrieval-Augmented Generation)

Supported Vector DBs:
- Pinecone (cloud, free tier available)
- ChromaDB (self-hosted or cloud)

Key Concepts:
- Embeddings: Vector representations of text (created by LLMs)
- Semantic search: Find similar text based on meaning, not keywords
- Namespaces: Logical separation of vectors (e.g., by user)
"""

from typing import Any, Optional

from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_groq import GroqEmbeddings
from langchain_pinecone import PineconeVectorStore

try:
    from langchain_chroma import Chroma
    CHROMA_AVAILABLE = True
except ImportError:
    CHROMA_AVAILABLE = False

from app.config import settings
from app.utils.logger import get_logger

log = get_logger(__name__)


# =============================================================================
# EMBEDDING MODEL INITIALIZATION
# =============================================================================
def get_embedding_model():
    """
    Get embedding model based on configured LLM provider.
    
    Embeddings convert text to vectors (numerical representations).
    Same text always produces same vector, enabling semantic similarity search.
    
    Providers:
    - Google (text-embedding-004): 768 dimensions, good quality
    - Groq: Fast inference, compatible with sentence-transformers
    
    Returns:
        Embedding model instance
    """
    if settings.DEFAULT_LLM_PROVIDER == "google":
        return GoogleGenerativeAIEmbeddings(
            model="models/text-embedding-004",
            google_api_key=settings.GOOGLE_API_KEY,
        )
    else:
        # For Groq, we'll use sentence-transformers locally
        # This is faster and doesn't consume API quota
        from langchain_community.embeddings import HuggingFaceEmbeddings
        
        return HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2",
            model_kwargs={"device": "cpu"},  # Use GPU if available
            encode_kwargs={"normalize_embeddings": True},
        )


# =============================================================================
# VECTOR STORE MANAGER
# =============================================================================
class VectorStoreManager:
    """
    Manages vector database operations for semantic search.
    
    Provides unified interface for Pinecone and ChromaDB.
    """
    
    def __init__(self):
        self.vector_store: Optional[Any] = None
        self.embeddings = None
    
    async def initialize(self) -> None:
        """
        Initialize vector store based on configuration.
        
        Creates index/collection if it doesn't exist.
        """
        self.embeddings = get_embedding_model()
        
        if settings.VECTOR_DB_TYPE == "pinecone":
            await self._init_pinecone()
        elif settings.VECTOR_DB_TYPE == "chromadb":
            await self._init_chromadb()
        else:
            raise ValueError(f"Unsupported vector DB: {settings.VECTOR_DB_TYPE}")
        
        log.info("Vector store initialized", db_type=settings.VECTOR_DB_TYPE)
    
    async def _init_pinecone(self) -> None:
        """
        Initialize Pinecone vector store.
        
        Pinecone is a managed vector database with:
        - Free tier: 1 index, 100K vectors
        - Serverless: Auto-scaling
        - Low latency: Global deployment
        """
        from pinecone import Pinecone, ServerlessSpec
        
        # Initialize Pinecone client
        pc = Pinecone(api_key=settings.PINECONE_API_KEY)
        
        # Create index if it doesn't exist
        index_name = settings.PINECONE_INDEX_NAME
        
        if index_name not in pc.list_indexes().names():
            log.info("Creating Pinecone index", index=index_name)
            
            # Get embedding dimension (768 for text-embedding-004, 384 for MiniLM)
            dimension = 768 if settings.DEFAULT_LLM_PROVIDER == "google" else 384
            
            pc.create_index(
                name=index_name,
                dimension=dimension,
                metric="cosine",  # Similarity metric
                spec=ServerlessSpec(
                    cloud="aws",
                    region=settings.PINECONE_ENVIRONMENT,
                ),
            )
            log.info("Pinecone index created")
        
        # Initialize LangChain vector store
        self.vector_store = PineconeVectorStore(
            index_name=index_name,
            embedding=self.embeddings,
        )
    
    async def _init_chromadb(self) -> None:
        """
        Initialize ChromaDB vector store.
        
        ChromaDB is an open-source vector database with:
        - Self-hosted or cloud
        - Embedded mode for development
        - Persistent storage
        """
        if not CHROMA_AVAILABLE:
            raise ImportError("ChromaDB not installed. Run: uv pip install chromadb")
        
        # Use persistent storage in data directory
        persist_directory = "data/chromadb"
        
        self.vector_store = Chroma(
            collection_name=settings.CHROMA_COLLECTION_NAME,
            embedding_function=self.embeddings,
            persist_directory=persist_directory,
        )
        
        log.info("ChromaDB initialized", persist_dir=persist_directory)
    
    async def add_texts(
        self,
        texts: list[str],
        metadatas: Optional[list[dict]] = None,
        namespace: Optional[str] = None,
    ) -> list[str]:
        """
        Add texts to vector store.
        
        Args:
            texts: List of text strings to embed and store
            metadatas: Optional metadata for each text
            namespace: Logical separation (e.g., user ID, conversation ID)
        
        Returns:
            List of document IDs
        """
        if not self.vector_store:
            raise RuntimeError("Vector store not initialized")
        
        ids = await self.vector_store.aadd_texts(
            texts=texts,
            metadatas=metadatas,
        )
        
        log.info("Texts added to vector store", count=len(texts), namespace=namespace)
        return ids
    
    async def similarity_search(
        self,
        query: str,
        k: int = 5,
        namespace: Optional[str] = None,
    ) -> list[dict]:
        """
        Search for similar texts.
        
        Args:
            query: Search query text
            k: Number of results to return
            namespace: Search within specific namespace
        
        Returns:
            List of similar documents with metadata
        """
        if not self.vector_store:
            raise RuntimeError("Vector store not initialized")
        
        results = await self.vector_store.asimilarity_search(
            query=query,
            k=k,
        )
        
        log.debug("Similarity search completed", query=query[:50], results=len(results))
        return results


# =============================================================================
# GLOBAL VECTOR STORE INSTANCE
# =============================================================================
vector_store_manager = VectorStoreManager()


async def init_vector_store() -> None:
    """Initialize vector store. Call at application startup."""
    await vector_store_manager.initialize()


async def close_vector_store() -> None:
    """Cleanup vector store. Call at application shutdown."""
    # No cleanup needed for current implementations
    log.info("Vector store closed")
```

***

### **File 6: Update `backend/app/main.py`** (Add database initialization)

Replace the lifespan function with:

```python
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Lifespan events for FastAPI application."""
    # Startup
    log.info("Application starting", environment=settings.ENVIRONMENT, debug=settings.DEBUG)
    
    # Initialize databases
    from app.db.postgres import init_database, check_database_connection
    from app.db.redis_cache import init_cache
    from app.db.vector_store import init_vector_store
    
    # Check PostgreSQL connection
    if await check_database_connection():
        await init_database()
    else:
        log.error("Failed to connect to PostgreSQL")
    
    # Initialize Redis cache
    await init_cache()
    
    # Initialize vector store
    await init_vector_store()
    
    log.info("All database connections initialized")
    
    yield  # Application runs here
    
    # Shutdown
    log.info("Application shutting down")
    
    from app.db.postgres import close_database
    from app.db.redis_cache import close_cache
    from app.db.vector_store import close_vector_store
    
    await close_database()
    await close_cache()
    await close_vector_store()
```

***

## **CHECKPOINT TEST: Verify Database Connections**

```bash
# From backend/ directory with venv activated

# Test PostgreSQL connection (update .env with your cloud PostgreSQL URL first)
python -c "
import asyncio
from app.db.postgres import check_database_connection, init_database

async def test():
    connected = await check_database_connection()
    if connected:
        await init_database()
        print('✓ PostgreSQL connected and tables created')
    else:
        print('✗ PostgreSQL connection failed')

asyncio.run(test())
"

# Test Redis connection (update .env with your Redis URL)
python -c "
import asyncio
from app.db.redis_cache import cache

async def test():
    await cache.connect()
    await cache.set('test_key', {'hello': 'world'})
    value = await cache.get('test_key')
    print(f'✓ Redis connected: {value}')
    await cache.disconnect()

asyncio.run(test())
"

# Test Vector Store (Pinecone or ChromaDB)
python -c "
import asyncio
from app.db.vector_store import vector_store_manager

async def test():
    await vector_store_manager.initialize()
    print('✓ Vector store initialized')

asyncio.run(test())
"

# Start server and verify all connections
uv run uvicorn app.main:app --reload

# In another terminal:
curl http://localhost:8000/api/health
# Should see: {"status":"healthy","environment":"development","version":"0.1.0"}
```

***

**Reply "done" when all database connections are working, then we'll proceed to Step 1.3: MCP Tools Implementation**.[2][3][4][1]

[1](https://github.com/seapagan/fastapi_async_sqlalchemy2_example)
[2](https://www.reddit.com/r/LangGraph/comments/1njhmrh/langgraph_checkpointer_issue_with_postgresql/)
[3](https://langchain-ai.github.io/langgraph/how-tos/memory/add-memory/)
[4](https://python.langchain.com/docs/integrations/vectorstores/pinecone/)
[5](https://www.youtube.com/watch?v=gg7AX1iRnmg)
[6](https://dev.to/akarshan/asynchronous-database-sessions-in-fastapi-with-sqlalchemy-1o7e)
[7](https://testdriven.io/blog/fastapi-sqlmodel/)
[8](https://github.com/reinhud/async-fastapi-postgres-template)
[9](https://api.python.langchain.com/en/latest/vectorstores/langchain_pinecone.vectorstores.PineconeVectorStore.html)
[10](https://craftyourstartup.com/cys-docs/setup-postgresql-fastapi-guide/)
[11](https://www.npmjs.com/package/@langchain/langgraph-checkpoint-postgres)
[12](https://www.ai-for-devs.com/blog/how)
[13](https://www.reddit.com/r/Python/comments/yrelvq/best_approach_for_async_sqlalchemy_in_fastapi/)
[14](https://airbyte.com/data-engineering-resources/pinecone-vector-database)
[15](https://www.reddit.com/r/FastAPI/comments/11vwn1p/fastapi_with_async_sqlalchemy_20_and_alembic/)
[16](https://pydigger.com/pypi/langgraph-checkpoint-postgres)
[17](https://berkkaraal.com/blog/2024/09/19/setup-fastapi-project-with-async-sqlalchemy-2-alembic-postgresql-and-docker/)
[18](https://langchain-ai.github.io/langgraphjs/how-tos/persistence-postgres/)
[19](https://docs.pinecone.io/reference/python-sdk)
[20](https://www.youtube.com/watch?v=cH0immwfykI)