# Agent: Failure Prediction

AI-Driven Predictive Maintenance with MongoDB Atlas

Overview

This notebook demonstrates an agentic workflow for failure prediction and root cause analysis in industrial settings, leveraging MongoDB Atlas as the central data platform.

Automated Root Cause Analysis

The end user sends an equipment malfunction alert to the agent

Failure Agent performs diagnostics that traditionally take hours manually

Atlas Vector Search retrieves contextual insights from:
- Historical maintenance logs
- Equipment documentation
- Environmental data (temperature, humidity, etc.)
= ERP/MES systems

LLM analyzes gathered context and generates incident reports with corrective actions

Architecture Flow

Real-time Alerts â†’ AI Agents â†’ Root Cause Analysis â†’ Incident Reports

## Prerequisites
- MongoDB Atlas cluster with Atlas Vector Search enabled
- Voyage AI API key
- OpenAI API key
- Python packages: pymongo, voyageai, openai, langchain, langchain-openai, langgraph, asyncio, nest_asyncio

## 1: Import Required Libraries and Configure Environment

## Setup: Configure Virtual Environment and Install Dependencies

In [None]:
# Installing a libraries' directly in the notebook
%pip install dotenv pymongo voyageai openai langchain langchain-openai langgraph asyncio nest_asyncio

In [None]:
import os
import json
import sys
import httpx
from pathlib import Path
from typing import Optional, List, Dict, Any
from pymongo import MongoClient
from pymongo.errors import ServerSelectionTimeoutError
from pymongo.operations import SearchIndexModel
from dotenv import load_dotenv

import voyageai
from openai import OpenAI


# Load environment variables
load_dotenv()

# Configure MONGODB URI from environment variables
MONGODB_URI = os.getenv("MONGODB_URI")
DATABASE_NAME = os.getenv("DATABASE_NAME")

# Configure LLM endpoint and API keys from environment variables
LLM_API_ENDPOINT = os.getenv("LLM_API_ENDPOINT")
LLM_API_KEY = os.getenv("LLM_API_KEY")
COMPLETION_MODEL = os.getenv("COMPLETION_MODEL", "gpt-3.5-turbo")  # Default to gpt-3.5-turbo if not set

# Configure embedding API settings
# Set USE_OPENAI_COMPATIBLE_EMBEDDINGS to 'true' or '1' to use OpenAI-compatible API
# Set to 'false' or '0' (or leave unset) to use Voyage AI native API
USE_OPENAI_COMPATIBLE_EMBEDDINGS = os.getenv("USE_OPENAI_COMPATIBLE_EMBEDDINGS", "false").lower() in ['true', '1', 'yes']

# Embedding API configuration
EMBEDDING_API_ENDPOINT = os.getenv("EMBEDDING_API_ENDPOINT")  # Used for OpenAI-compatible API
EMBEDDING_API_KEY = os.getenv("EMBEDDING_API_KEY")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL")

# Legacy Voyage-specific variables (for backward compatibility)
VOYAGE_API_ENDPOINT = os.getenv("VOYAGE_API_ENDPOINT")
VOYAGE_API_KEY = os.getenv("VOYAGE_API_KEY")

# If using Voyage native API and EMBEDDING_API_KEY not set, use VOYAGE_API_KEY
if not USE_OPENAI_COMPATIBLE_EMBEDDINGS and not EMBEDDING_API_KEY and VOYAGE_API_KEY:
    EMBEDDING_API_KEY = VOYAGE_API_KEY

# SSL verification setting (for internal/dev environments with self-signed certs)
DISABLE_SSL_VERIFICATION = os.getenv("DISABLE_SSL_VERIFICATION", "false").lower() in ['true', '1', 'yes']

# Validate that API keys are available
print("Configuration Status:")
print(f"âœ“ MongoDB URI configured: {bool(MONGODB_URI)}")
print(f"âœ“ Embedding API type: {'OpenAI-compatible' if USE_OPENAI_COMPATIBLE_EMBEDDINGS else 'Voyage AI native'}")
print(f"âœ“ Embedding API key configured: {bool(EMBEDDING_API_KEY)}")
print(f"âœ“ Embedding model configured: {bool(EMBEDDING_MODEL)}")
print(f"âœ“ LLM API key configured: {bool(LLM_API_KEY)}")
print(f"âœ“ LLM completion model: {COMPLETION_MODEL}")

# Initialize embedding client based on configuration
embedding_client = None

if USE_OPENAI_COMPATIBLE_EMBEDDINGS:
    # Use OpenAI-compatible API (e.g., MLIS, local OpenAI-compatible server, etc.)
    if EMBEDDING_API_KEY:
        # Create HTTP client with optional SSL verification disabled
        if DISABLE_SSL_VERIFICATION:
            http_client = httpx.Client(verify=False)
            print("âš  WARNING: SSL verification disabled for embedding client")
        else:
            http_client = None
        
        embedding_client = OpenAI(
            api_key=EMBEDDING_API_KEY,
            base_url=EMBEDDING_API_ENDPOINT,
            http_client=http_client
        )
        print(f"âœ“ Initialized OpenAI-compatible embedding client (endpoint: {EMBEDDING_API_ENDPOINT})")
    else:
        print("âœ— EMBEDDING_API_KEY not configured for OpenAI-compatible API")
else:
    # Use Voyage AI native API
    if EMBEDDING_API_KEY:
        embedding_client = voyageai.Client(api_key=EMBEDDING_API_KEY)
        print(f"âœ“ Initialized Voyage AI native client")
    else:
        print("âœ— EMBEDDING_API_KEY (or VOYAGE_API_KEY) not configured for Voyage AI")

# Initialize LLM client
if LLM_API_KEY:
    # Create HTTP client with optional SSL verification disabled
    if DISABLE_SSL_VERIFICATION:
        llm_http_client = httpx.Client(verify=False)
        print("âš  WARNING: SSL verification disabled for LLM client")
    else:
        llm_http_client = None
    
    llm_client = OpenAI(
        api_key=LLM_API_KEY,
        base_url=LLM_API_ENDPOINT if LLM_API_ENDPOINT else None,
        http_client=llm_http_client
    )
    print(f"âœ“ Initialized LLM client")

## 2: Connect to MongoDB Atlas

In [None]:
def connect_to_mongodb(uri: str, db_name: str = DATABASE_NAME) -> tuple:
    """
    Connect to MongoDB Atlas cluster
    
    Args:
        uri: MongoDB connection string
        db_name: Database name to use
        
    Returns:
        Tuple of (client, database)
    """
    try:
        client = MongoClient(uri, serverSelectionTimeoutMS=5000)
        # Verify connection
        client.admin.command('ping')
        db = client[db_name]
        print(f"âœ“ Successfully connected to MongoDB Atlas")
        print(f"âœ“ Database: {db_name}")
        return client, db
    except ServerSelectionTimeoutError:
        print("âœ— Failed to connect to MongoDB Atlas. Check your connection string.")
        raise
    except Exception as e:
        print(f"âœ— Connection error: {e}")
        raise

# Connect to MongoDB
if MONGODB_URI:
    mongo_client, db = connect_to_mongodb(MONGODB_URI)
    print(f"âœ“ Available collections: {db.list_collection_names()}")
else:
    print("âœ— MONGODB_URI not configured. Please set the environment variable.")

## 3: Load and Ingest Datasets from Data Folder

In [None]:
def load_json_dataset(file_path: str) -> List[Dict[str, Any]]:
    """
    Load a JSON dataset from file
    
    Args:
        file_path: Path to the JSON file
        
    Returns:
        List of documents
    """
    try:
        with open(file_path, 'r') as f:
            data = json.load(f)
        print(f"âœ“ Loaded {file_path}: {len(data)} documents")
        return data if isinstance(data, list) else [data]
    except FileNotFoundError:
        print(f"âœ— File not found: {file_path}")
        return []
    except json.JSONDecodeError as e:
        print(f"âœ— Error decoding JSON from {file_path}: {e}")
        return []

# Locate the data folder
data_folder = Path("./data")
if not data_folder.exists():
    print(f"âœ— Data folder not found at {data_folder.absolute()}")
    # Try alternative path
    alt_path = Path("../data")
    if alt_path.exists():
        data_folder = alt_path
    else:
        print("Please ensure the data folder exists in the workspace root")

print(f"\nLoading datasets from: {data_folder.absolute()}")
print(f"Available files: {list(data_folder.glob('*'))}\n")

# Load datasets
manuals_data = load_json_dataset(str(data_folder / "manuals.json"))
interviews_data = load_json_dataset(str(data_folder / "interviews.json"))
workorders_data = load_json_dataset(str(data_folder / "workorders.json"))

print(f"\nDataset Summary:")
print(f"  - Manuals: {len(manuals_data)} documents")
print(f"  - Interviews: {len(interviews_data)} documents")
print(f"  - Work Orders: {len(workorders_data)} documents")

In [None]:
def ingest_data_to_mongodb(db, collection_name: str, documents: List[Dict]) -> bool:
    """
    Ingest documents into a MongoDB collection
    
    Args:
        db: MongoDB database object
        collection_name: Name of the collection
        documents: List of documents to insert
        
    Returns:
        True if successful, False otherwise
    """
    if not documents:
        print(f"âœ— No documents to ingest into {collection_name}")
        return False
    
    try:
        collection = db[collection_name]
        # Drop existing collection to start fresh
        collection.drop()
        
        # Insert documents
        result = collection.insert_many(documents)
        print(f"âœ“ Ingested {len(result.inserted_ids)} documents into '{collection_name}'")
        return True
    except Exception as e:
        print(f"âœ— Error ingesting data into {collection_name}: {e}")
        return False

# Ingest datasets into MongoDB
if MONGODB_URI and manuals_data:
    ingest_data_to_mongodb(db, "manuals", manuals_data)
    
if MONGODB_URI and interviews_data:
    ingest_data_to_mongodb(db, "interviews", interviews_data)

if MONGODB_URI and workorders_data:
    ingest_data_to_mongodb(db, "workorders", workorders_data)

print("\nâœ“ Data ingestion complete")

## 4: Generate Embeddings Using Configured Model

Embeddings are generated using the model specified in your `.env` file.

The notebook supports both:
- **OpenAI-compatible APIs** (HPE MLIS - recommended): NVIDIA, Voyage, custom models
- **Voyage AI native API** (legacy): For Voyage cloud service

In [None]:
def extract_text_for_embedding(document: Dict[str, Any], text_fields: List[str] = None) -> str:
    """
    Extract text content from a document for embedding
    
    Args:
        document: The document to extract text from
        text_fields: List of field names to extract (if None, uses sensible defaults)
        
    Returns:
        Combined text string
    """
    if text_fields is None:
        # Default fields to check for text content
        text_fields = ['text', 'title', 'observations']
    
    texts = []
    for field in text_fields:
        if field in document and document[field]:
            value = document[field]
            if isinstance(value, str):
                texts.append(value)
            elif isinstance(value, list):
                texts.extend([str(v) for v in value if v])
    
    return " ".join(texts)

def generate_embeddings_batch(texts: List[str], model: str = EMBEDDING_MODEL, input_type: str = "passage") -> List[List[float]]:
    """
    Generate embeddings for a batch of texts using configured API
    
    Supports both Voyage AI native API and OpenAI-compatible APIs.
    The API type is determined by USE_OPENAI_COMPATIBLE_EMBEDDINGS variable.
    
    Args:
        texts: List of texts to embed
        model: Model name to use for embeddings
        input_type: Type of input - "passage" for documents, "query" for search queries
        
    Returns:
        List of embedding vectors
    """
    if not texts:
        return []
    
    if not embedding_client:
        print("âœ— Embedding client not initialized")
        return []
    
    try:
        if USE_OPENAI_COMPATIBLE_EMBEDDINGS:
            # Use OpenAI-compatible API
            # Some endpoints (like Voyage models on MLIS) require input_type parameter
            response = embedding_client.embeddings.create(
                input=texts,
                model=model,
                extra_body={"input_type": input_type}  # passage for docs, query for searches
            )
            embeddings = [e.embedding for e in response.data]
        else:
            # Use Voyage AI native API
            # Map input_type: passage->document, query->query
            voyage_input_type = "document" if input_type == "passage" else "query"
            response = embedding_client.embed(
                texts=texts,
                model=model,
                input_type=voyage_input_type
            )
            embeddings = [e for e in response.embeddings]
        
        print(f"âœ“ Generated {len(embeddings)} embeddings using {model}")
        return embeddings
    except Exception as e:
        print(f"âœ— Error generating embeddings: {e}")
        return []

# Test embedding generation with a sample
print(f"\nTesting embedding generation ({'OpenAI-compatible' if USE_OPENAI_COMPATIBLE_EMBEDDINGS else 'Voyage AI native'})...")
test_texts = ["This is a test document", "Another test text for embeddings"]
test_embeddings = generate_embeddings_batch(test_texts)
if test_embeddings:
    print(f"âœ“ Sample embedding dimension: {len(test_embeddings[0])}")
else:
    print("âœ— Embedding generation failed. Check API key and connectivity.")

## 5: Update Collections with Embeddings Field

In [None]:
def add_embeddings_to_collection(db, collection_name: str, batch_size: int = 10) -> bool:
    """
    Generate embeddings for all documents in a collection and update them
    
    Args:
        db: MongoDB database object
        collection_name: Name of the collection to process
        batch_size: Number of documents to process per batch
        
    Returns:
        True if successful, False otherwise
    """
    try:
        collection = db[collection_name]
        documents = list(collection.find({}))
        
        if not documents:
            print(f"âœ— No documents found in collection '{collection_name}'")
            return False
        
        print(f"\nProcessing {len(documents)} documents in '{collection_name}'...")
        
        # Process documents in batches
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            
            # Extract text from documents
            texts = [extract_text_for_embedding(doc) for doc in batch]
            
            # Generate embeddings for the batch
            embeddings = generate_embeddings_batch(texts)
            
            if not embeddings or len(embeddings) != len(batch):
                print(f"âœ— Embedding generation failed for batch {i//batch_size + 1}")
                continue
            
            # Update documents with embeddings
            for doc, embedding in zip(batch, embeddings):
                collection.update_one(
                    {"_id": doc["_id"]},
                    {"$set": {"embeddings": embedding}}
                )
            
            print(f"  âœ“ Processed batch {i//batch_size + 1}/{(len(documents) + batch_size - 1)//batch_size}")
        
        # Verify embeddings were added
        docs_with_embeddings = collection.count_documents({"embeddings": {"$exists": True}})
        print(f"âœ“ Updated {docs_with_embeddings} documents with embeddings in '{collection_name}'")
        return True
        
    except Exception as e:
        print(f"âœ— Error adding embeddings to {collection_name}: {e}")
        return False

# Add embeddings to both collections
if MONGODB_URI and test_embeddings:  # Only proceed if embeddings work
    print("=" * 60)
    print("GENERATING AND ADDING EMBEDDINGS TO COLLECTIONS")
    print("=" * 60)
    
    add_embeddings_to_collection(db, "manuals", batch_size=5)
    add_embeddings_to_collection(db, "interviews", batch_size=5)
    add_embeddings_to_collection(db, "workorders", batch_size=10)
else:
    print("âœ— Skipping embedding generation - API not configured or failed")

## 6: Create Vector Indexes in MongoDB

In [None]:
def create_vector_search_index(db, collection_name: str, embedding_dim: int = 1024) -> bool:
    """
    Create a vector search index on the embeddings field
    
    Note: This requires MongoDB Atlas with Atlas Vector Search enabled
    
    Args:
        db: MongoDB database object
        collection_name: Name of the collection
        embedding_dim: Dimension of the embeddings
        
    Returns:
        True if successful, False otherwise
    """
    try:
        collection = db[collection_name]
        
        # Vector search index definition for Atlas Vector Search


        search_index_model = SearchIndexModel(
                                    definition={
                                        "fields": [
                                        {
                                            "type": "vector",
                                            "path": "embeddings",
                                            "numDimensions": embedding_dim,
                                            "similarity": "cosine"
                                        }
                                        ]
                                    },
                                    name="vector_index",
                                    type="vectorSearch"
                                )


        
        # Create the index via the collection's create_search_indexes method
        # Note: This method requires MongoDB Python driver >= 4.6
        try:
            # Try using the newer search indexes API
            search_indexes = collection.list_search_indexes()
            existing_indexes = [idx.get('name') for idx in search_indexes]
            for index in existing_indexes:
                print(index)
            
            if 'vector_index' not in existing_indexes:
                collection.create_search_index(model=search_index_model)
                print(f"âœ“ Created vector search index for '{collection_name}'")
            else:
                print(f"âœ“ Vector search index already exists for '{collection_name}'")
                
        except AttributeError:
            # Fallback for older driver versions
            print(f"âš  Vector search index creation requires MongoDB Atlas with Vector Search enabled")
            print(f"  Manually create the index in MongoDB Atlas UI with this definition:")
            print(f"  {json.dumps(index_definition, indent=2)}")
        
        return True
        
    except Exception as e:
        print(f"âš  Note: {e}")
        print(f"  Vector indexes should be created in MongoDB Atlas UI")
        return False

# Create vector indexes for both collections
if MONGODB_URI and test_embeddings:
    print("\n" + "=" * 60)
    print("CREATING VECTOR SEARCH INDEXES")
    print("=" * 60)
    
    # Use embedding dimension from test
    embedding_dim = len(test_embeddings[0]) if test_embeddings else 1024
    
    create_vector_search_index(db, "manuals", embedding_dim)
    create_vector_search_index(db, "interviews", embedding_dim)
    create_vector_search_index(db, "workorders", embedding_dim)
else:
    print("âœ— Skipping index creation - prerequisites not met")

## 7: Implement RAG Solution with OpenAI

In [None]:
def vector_search_mongodb(db, collection_name: str, query: str, num_results: int = 5) -> List[Dict]:
    """
    Perform vector similarity search on MongoDB collection
    
    Args:
        db: MongoDB database object
        collection_name: Name of the collection to search
        query: Query in natural language
        num_results: Number of results to return
        
    Returns:
        List of matching documents with similarity scores
    """
    try:
        collection = db[collection_name]

        query_embedding = generate_embeddings_batch([query])
        query_vector = query_embedding[0]

        
        # Use aggregation pipeline with vector search
        pipeline = [
            {
                "$vectorSearch": {
                    "index": "vector_index",
                    'queryVector': query_vector,
                    'numCandidates': 10, 
                    'limit': num_results
                }
            },
            {
                "$project": {
                    "'score": {"$meta": "vectorSearchScore"},
                    "document": "$$ROOT"
                }
            },
            {
                "$limit": num_results
            }
        ]
        
        # Try standard vector search first
        try:
            results = list(collection.aggregate(pipeline))
            return results
        except:
            # Fallback to simpler approach if aggregation fails
            # This works with documents that have embeddings field
            results = []
            documents = list(collection.find({"embeddings": {"$exists": True}}))
            
            if not documents:
                return []
            
            # Calculate similarity scores using cosine similarity
            import numpy as np
            query_vec = np.array(query_vector)
            
            for doc in documents:
                if 'embeddings' in doc:
                    doc_vec = np.array(doc['embeddings'])
                    # Cosine similarity
                    similarity = np.dot(query_vec, doc_vec) / (np.linalg.norm(query_vec) * np.linalg.norm(doc_vec))
                    results.append({
                        'similarityScore': float(similarity),
                        'document': doc
                    })
            
            # Sort by similarity score and return top results
            results.sort(key=lambda x: x['similarityScore'], reverse=True)
            return results[:num_results]
            
    except Exception as e:
        print(f"âœ— Error performing vector search: {e}")
        return []

def retrieve_context(db, query: str, num_results: int = 3) -> str:
    """
    Retrieve relevant context from both collections using vector search
    
    Args:
        db: MongoDB database object
        query: User query
        num_results: Number of results per collection
        
    Returns:
        Formatted context string for RAG
    """
    # Generate query embedding
    
    if not query:
        return "No context available"
        
    # Search both collections
    manual_results = vector_search_mongodb(db, "manuals", query, num_results)
    interview_results = vector_search_mongodb(db, "interviews", query, num_results)
    workorder_results = vector_search_mongodb(db, "workorders", query, num_results)

    # Format context
    context = "Retrieved Context:\n\n"
    
    if manual_results:
        context += "=== From Manuals ===\n"
        for i, result in enumerate(manual_results, 1):
            doc = result.get('document', result)
            score = result.get('similarityScore', 0)
            text = extract_text_for_embedding(doc)[:500]  # Limit text length
            context += f"{i}. (Score: {score:.2f}) {text}...\n\n"
    
    if interview_results:
        context += "=== From Interviews ===\n"
        for i, result in enumerate(interview_results, 1):
            doc = result.get('document', result)
            score = result.get('similarityScore', 0)
            text = extract_text_for_embedding(doc)[:500]  # Limit text length
            context += f"{i}. (Score: {score:.2f}) {text}...\n\n"

    if workorder_results:
        context += "=== From Work Orders ===\n"
        for i, result in enumerate(workorder_results, 1):
            doc = result.get('document', result)
            score = result.get('similarityScore', 0)
            text = extract_text_for_embedding(doc)[:500]  # Limit text length
            context += f"{i}. (Score: {score:.2f}) {text}...\n\n"
    
    return context

# Initialize RAG system
class MongoDBOpenAIRAG:
    """RAG system using MongoDB Atlas and OpenAI-compatible LLMs"""
    
    def __init__(self, db, model: str = None, temperature: float = 0.7):
        self.db = db
        self.model = model if model else COMPLETION_MODEL  # Use env var if not specified
        self.temperature = temperature
    
    def answer_question(self, query: str, num_context_docs: int = 3) -> Dict[str, Any]:
        """
        Answer a question using RAG approach
        
        Args:
            query: User question
            num_context_docs: Number of context documents to retrieve
            
        Returns:
            Dict with answer, context, and sources
        """
        # Retrieve context
        context = retrieve_context(self.db, query, num_context_docs)
        
        # Create prompt for OpenAI
        system_prompt = """You are a helpful assistant answering questions about maintenance systems and procedures.
Use the provided context to answer the question accurately. If the context doesn't contain relevant information, say so.
Always cite your sources from the context."""
        
        user_message = f"""Context Information:
{context}

Question: {query}

Please provide a helpful answer based on the context above."""
        
        try:
            # Call OpenAI API
            response = llm_client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_message}
                ],
                temperature=self.temperature,
                max_tokens=500
            )

            answer = response.choices[0].message.content

            return {
                'success': True,
                'query': query,
                'answer': answer,
                'context': context,
                'model': self.model
            }
        except Exception as e:
            return {
                'success': False,
                'query': query,
                'error': str(e),
                'context': context
            }

# Initialize RAG system
if MONGODB_URI and LLM_API_KEY:
    rag_system = MongoDBOpenAIRAG(db, model=COMPLETION_MODEL)
    print(f"âœ“ RAG system initialized successfully with model: {COMPLETION_MODEL}")
else:
    print("âœ— RAG system initialization failed - missing API keys")

## 8: Query and Retrieve Results

In [None]:
def format_rag_response(response: Dict[str, Any]) -> str:
    """
    Format RAG response for display
    
    Args:
        response: Response dictionary from RAG system
        
    Returns:
        Formatted string for display
    """
    output = "\n" + "=" * 70 + "\n"
    output += f"QUERY: {response.get('query', 'N/A')}\n"
    output += "=" * 70 + "\n\n"
    
    if response.get('success'):
        output += f"ANSWER:\n{response.get('answer', 'No answer generated')}\n\n"
        output += "-" * 70 + "\n"
        output += f"CONTEXT SOURCES:\n{response.get('context', 'No context retrieved')}\n"
    else:
        output += f"ERROR: {response.get('error', 'Unknown error')}\n"
        output += f"RETRIEVED CONTEXT:\n{response.get('context', 'No context')}\n"
    
    output += "=" * 70 + "\n"
    return output

# Example queries to test the RAG system
example_queries = [
    "What are the maintenance procedures for critical equipment?",
    "E12 high temperature",
    "What is the recommended maintenance schedule?"
]

print("\n" + "=" * 70)
print("RAG SYSTEM DEMONSTRATION")
print("=" * 70)

if 'rag_system' in locals() and MONGODB_URI:
    print("\nRunning example queries...\n")
    
    for i, query in enumerate(example_queries, 1):
        print(f"\n--- Query {i} ---")
        response = rag_system.answer_question(query, num_context_docs=2)
        formatted = format_rag_response(response)
        print(formatted)
        
        # Add a small delay between API calls to avoid rate limiting
        import time
        if i < len(example_queries):
            time.sleep(2)
else:
    print("\nâœ— RAG system not available for querying")
    print("  Ensure MONGODB_URI and OPENAI_API_KEY are configured")

## 9. Import Required Libraries

Import necessary libraries including langchain, langgraph, and other dependencies for building the failure agent.

In [None]:
# Import Required Libraries
import os
import json
from datetime import datetime
from typing import Any, Dict, Optional, List
from dotenv import load_dotenv

# LangChain and LangGraph imports
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.types import StateSnapshot
from langgraph.graph.message import add_messages
from typing import Annotated

# Load environment variables
load_dotenv()


## 10. Define the State Schema

The state schema maintains the conversation history and messages throughout the agent's execution.

In [None]:
# Define State Schema
from typing import TypedDict

class FailureAgentState(TypedDict):
    """State schema for the Failure Agent"""
    messages: Annotated[List[BaseMessage], add_messages]
    
print("âœ“ State schema defined")

## 11. Create Tool Functions

The failure agent uses four main tools to diagnose failures and generate incident reports:
- **retrieve_manual**: Search technical manuals for relevant information
- **retrieve_work_orders**: Find related maintenance work orders
- **retrieve_interviews**: Access maintenance staff expertise and historical insights
- **generate_incident_report**: Create and store incident reports

In [None]:
# ===== IMPROVED TOOL DEFINITIONS =====

def insert_incident_report(
    error_code: str,
    error_name: str,
    root_cause: str,
    repair_instructions: List[Dict[str, Any]],
    machine_id: str,
    timestamp: Optional[str] = None,
) -> Dict[str, Any]:
    """
    Insert an incident report into MongoDB.
    
    Args:
        error_code: The error code
        error_name: Human-readable error name
        root_cause: Root cause analysis
        repair_instructions: List of repair steps with step and description keys
        machine_id: Machine ID
        timestamp: Optional timestamp
        
    Returns:
        Dict with success status and incident ID
    """
    from datetime import datetime, timezone
    
    try:
        # Validate repair instructions
        if not isinstance(repair_instructions, list) or len(repair_instructions) == 0:
            raise ValueError("repair_instructions must be a non-empty list")
        
        # Ensure each instruction has required fields
        validated_instructions = []
        for i, instruction in enumerate(repair_instructions):
            if isinstance(instruction, str):
                validated_instructions.append({"step": i + 1, "description": instruction})
            elif isinstance(instruction, dict):
                if "description" not in instruction:
                    instruction["description"] = str(instruction)
                if "step" not in instruction:
                    instruction["step"] = i + 1
                validated_instructions.append(instruction)
            else:
                validated_instructions.append({"step": i + 1, "description": str(instruction)})
        
        collection = db["incident_reports"]
        
        report = {
            "error_code": error_code,
            "error_name": error_name,
            "root_cause": root_cause,
            "repair_instructions": validated_instructions,
            "machine_id": machine_id,
            "timestamp": timestamp or datetime.now(timezone.utc).isoformat(),
            "status": "created",
            "created_at": datetime.now(timezone.utc).isoformat(),
        }
        
        result = collection.insert_one(report)
        
        return {
            "success": True,
            "incident_id": str(result.inserted_id),
            "message": "Incident report created successfully",
            "error_code": error_code,
            "machine_id": machine_id
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "message": f"Failed to insert incident report: {e}"
        }


@tool
def retrieve_manual(query: str, n: int = 3) -> str:
    """
    Retrieve relevant technical manuals for the alert via MongoDB vector search.
    
    Args:
        query: The search query for technical documentation
        n: Number of results to return (default 3)
    
    Returns:
        JSON string containing relevant manual excerpts
    """
    try:
        results = vector_search_mongodb(db, "manuals", query, n)
        if not results:
            return json.dumps({"found": False, "message": "No manuals found for query"})
        
        formatted_results = []
        for result in results:
            doc = result.get('document', result)
            score = result.get('similarityScore', 0)
            text = extract_text_for_embedding(doc)[:300]
            formatted_results.append({
                "score": round(score, 3),
                "content": text
            })
        
        return json.dumps({"found": True, "count": len(formatted_results), "results": formatted_results})
    except Exception as e:
        return json.dumps({"error": str(e), "found": False})


@tool
def retrieve_work_orders(query: str, n: int = 3) -> str:
    """
    Retrieve related work orders for the alert via MongoDB vector search.
    
    Args:
        query: The search query for work orders
        n: Number of results to return (default 3)
    
    Returns:
        JSON string containing related work order information
    """
    try:
        results = vector_search_mongodb(db, "workorders", query, n)
        if not results:
            return json.dumps({"found": False, "message": "No work orders found for query"})
        
        formatted_results = []
        for result in results:
            doc = result.get('document', result)
            score = result.get('similarityScore', 0)
            text = extract_text_for_embedding(doc)[:300]
            formatted_results.append({
                "score": round(score, 3),
                "content": text
            })
        
        return json.dumps({"found": True, "count": len(formatted_results), "results": formatted_results})
    except Exception as e:
        return json.dumps({"error": str(e), "found": False})


@tool
def retrieve_interviews(query: str, n: int = 3) -> str:
    """
    Retrieve interviews and expertise related to the alert via MongoDB vector search.
    
    Args:
        query: The search query for maintenance expertise
        n: Number of results to return (default 3)
    
    Returns:
        JSON string containing relevant interview excerpts
    """
    try:
        results = vector_search_mongodb(db, "interviews", query, n)
        if not results:
            return json.dumps({"found": False, "message": "No interviews found for query"})
        
        formatted_results = []
        for result in results:
            doc = result.get('document', result)
            score = result.get('similarityScore', 0)
            text = extract_text_for_embedding(doc)[:300]
            formatted_results.append({
                "score": round(score, 3),
                "content": text
            })
        
        return json.dumps({"found": True, "count": len(formatted_results), "results": formatted_results})
    except Exception as e:
        return json.dumps({"error": str(e), "found": False})


@tool
def generate_incident_report(
    error_code: str,
    error_name: str,
    root_cause: str,
    repair_instructions: List[str],
    machine_id: str
) -> str:
    """
    Generate and store an incident report in MongoDB for the failure alert.
    
    Args:
        error_code: The error code for the incident (e.g., "E12")
        error_name: Human-readable name of the error (e.g., "High temperature")
        root_cause: Root cause analysis inferred from context (detailed explanation)
        repair_instructions: List of repair step descriptions as strings (e.g., ["Step 1: Stop the motor", "Step 2: Inspect bearing"])
        machine_id: ID of the affected machine
    
    Returns:
        JSON string with incident report confirmation
    """
    try:
        # Convert string list to dict list with step numbers
        instructions_with_steps = []
        for i, instruction in enumerate(repair_instructions, 1):
            instructions_with_steps.append({
                "step": i,
                "description": str(instruction)
            })
        
        result = insert_incident_report(
            error_code=error_code,
            error_name=error_name,
            root_cause=root_cause,
            repair_instructions=instructions_with_steps,
            machine_id=machine_id
        )
        return json.dumps(result)
    except Exception as e:
        return json.dumps({
            "success": False,
            "error": str(e),
            "message": f"Error creating incident report: {e}"
        })



# Get all tools
tools = [retrieve_manual, retrieve_work_orders, retrieve_interviews, generate_incident_report]

print("âœ“ Tool functions defined and registered (using MongoDB + Voyage AI)")

# Initialize the LLM
llm = ChatOpenAI(
    model=os.getenv("COMPLETION_MODEL"),
    temperature=0,
    api_key=os.getenv("LLM_API_KEY")
)

# Bind tools to the model
llm_with_tools = llm.bind_tools(tools)

print("âœ“ Language model configured with tools")

# Test the generate_incident_report tool directly
test_report = insert_incident_report(
    error_code="E12",
    error_name="High temperature",
    root_cause="Bearing friction due to insufficient lubrication",
    repair_instructions=[
        {"step": 1, "description": "Stop the motor and let it cool"},
        {"step": 2, "description": "Inspect bearing for damage"},
        {"step": 3, "description": "Apply fresh lubricant"},
        {"step": 4, "description": "Restart and monitor temperature"}
    ],
    machine_id="M1"
)

print("âœ“ Tool test result:")
print(json.dumps(test_report, indent=2))


12. Define Agent Nodes and Routing Logic
Agent Node
The agent node processes incoming messages and calls the LLM to determine the next action:

In [None]:
# Define the Agent Node
async def agent_node(state: FailureAgentState) -> FailureAgentState:
    """
    The agent node:
    1. Receives alert details about machine failures
    2. Routes to appropriate tools for information gathering
    3. Analyzes retrieved context
    4. Decides when to generate incident report
    """
    # Create the prompt template
    prompt = ChatPromptTemplate.from_messages([
        (
            "system",
            """You are the Failure Agent. Your role is to:
1. Receive alert details about machine failures
2. Retrieve additional context from manuals, work orders, and maintenance expertise
3. Analyze the root cause of the failure
4. Generate a comprehensive incident report with repair instructions

IMPORTANT: When calling generate_incident_report, you MUST provide:
- error_code: The error code from the alert
- error_name: The error name from the alert
- root_cause: Your analysis of why this failure occurred based on retrieved context
- repair_instructions: A LIST OF STRINGS describing step-by-step repair procedures (required - never omit!)
- machine_id: The machine ID from the alert

Example repair_instructions format:
["Stop the motor and allow it to cool for 30 minutes", "Inspect the bearing for wear and damage", "Apply fresh lubricant to bearings", "Restart motor and monitor temperature"]

Use your tools strategically to gather all necessary information before generating the incident report.
After the incident report is generated, acknowledge the completion with a brief summary."""
        ),
        MessagesPlaceholder(variable_name="messages"),
    ])
    
    # Format the messages
    formatted_prompt = await prompt.ainvoke({"messages": state["messages"]})
    
    # Get the response from the model
    response = await llm_with_tools.ainvoke(formatted_prompt)
    
    return {
        "messages": [response]
    }

print("âœ“ Agent node defined")

In [None]:
# Define the Tool Execution Node
async def process_tool_calls(state: FailureAgentState) -> FailureAgentState:
    """
    Process tool calls from the agent and return the results.
    """
    messages = state["messages"]
    last_message = messages[-1]
    
    tool_results = []
    
    # Check if the last message has tool calls
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        for tool_call in last_message.tool_calls:
            tool_name = tool_call["name"]
            tool_input = tool_call["args"]
            
            print(f"\nðŸ”§ Executing tool: {tool_name}")
            print(f"   Input: {tool_input}")
            
            # Find and execute the tool
            for tool in tools:
                if tool.name == tool_name:
                    result = await tool.ainvoke(tool_input)
                    print(f"   Result: {result[:100]}...")
                    
                    tool_message = ToolMessage(
                        content=result,
                        tool_call_id=tool_call["id"]
                    )
                    tool_results.append(tool_message)
                    break
    
    return {
        "messages": tool_results
    }

print("âœ“ Tool execution node defined")

In [None]:
# Define the Routing Logic
def should_continue(state: FailureAgentState) -> str:
    """
    Routes to 'tools' node if tool calls present
    Returns END otherwise to terminate agent loop
    """
    messages = state["messages"]
    last_message = messages[-1]
    
    # If the last message has tool calls, route to the tools node
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    
    # Otherwise, end the agent
    return END

print("âœ“ Routing logic defined")

## 13. Compile the Graph

Create the StateGraph and compile it into an executable agent.
Creates a StateGraph with proper node connections:

Graph Structure:
START â†’ agent â†’ [decision]
                    â”œâ”€â†’ tools â†’ agent (loop back)
                    â””â”€â†’ END

Compilation:


In [None]:
# Build the StateGraph
workflow = StateGraph(FailureAgentState)

# Add nodes
workflow.add_node("agent", agent_node)
workflow.add_node("tools", process_tool_calls)

# Add edges
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")

# Compile the graph
failure_agent = workflow.compile()

print("âœ“ Failure Agent graph compiled successfully")

## 14. Test Scenario: Motor Overheating Alert (E12)
Alert Input

{
  "err_code": "E12",
  "err_name": "High temperature",
  "machine_id": "M1",
  "details": {
    "temperature": 114.55,
    "vibration": 0.235
  },
  "ts": "2026-01-26T01:01:04.980Z",
  "status": "new"
}

In [None]:
# Test Scenario 1: Motor Overheating
print("=" * 80)
print("TEST SCENARIO 1: MOTOR OVERHEATING (E001)")
print("=" * 80)

test_input_1 = """
{ "err_code": "E12",  
  "err_name": "High temperature",  
  "machine_id": "M1",  
  "details": {  "temperature": 114.55,    
                "vibration": 0.235  },  
                "ts": {    "$date": "2026-01-26T01:01:04.980Z"  },  
  "status": "new"}
"""

initial_state_1 = {
    "messages": [HumanMessage(content=test_input_1)]
}

print("\nðŸ“¨ Input Alert:")
print(test_input_1)
print("\nðŸ¤– Agent Processing...")


# Run the agent asynchronously
import asyncio
import nest_asyncio

# Apply nest_asyncio to allow nested event loops in Jupyter
nest_asyncio.apply()

# Run the agent directly without wrapping
result = asyncio.run(failure_agent.ainvoke(initial_state_1))


print("\nâœ… Agent Completed Processing.")
print("\nðŸ“‹ Final State Messages:")
for msg in result.get("messages", []):
    print(f"  - {type(msg).__name__}: {str(msg.content)[:100]}...")

print("\nâœ… Agent Completed Processing.")