<a href="https://colab.research.google.com/github/prem-cre/Multirag/blob/main/MultimodalAgents.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title Install Required Libraries
!pip install -qU langchain langgraph langchain_groq langchain_huggingface
!pip install -qU faiss-cpu pypdf tiktoken tavily-python
!pip install -qU langchain-community langchain-google-community
!pip install -qU newspaper3k beautifulsoup4 requests
!pip install -qU lxml[html_clean]

# @title Core Imports and Configuration
import os
import re
import json
import hashlib
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum

from langchain_groq import ChatGroq
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.agents import create_react_agent, AgentExecutor, Tool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage
from langchain.memory import ConversationBufferWindowMemory
from langchain_community.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader, WebBaseLoader
from langchain_core.documents import Document
from langchain_core.tools import tool
from tavily import TavilyClient
import requests
from bs4 import BeautifulSoup
from newspaper import Article
from google.colab import userdata

# Configure API Keys
os.environ["GROQ_API_KEY"] = userdata.get('groq_api_key')
os.environ["TAVILY_API_KEY"] = userdata.get('tavily')  # You'll need this

# Initialize LLM and Embeddings
llm = ChatGroq(model_name="llama-3.1-8b-instant", temperature=0.1)  # Changed to a supported model
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") # Changed to a publicly available model
tavily_client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])

In [None]:
# @title Define Data Models for Evidence and Sources

class SourceCredibility(Enum):
    """Credibility levels for sources"""
    OFFICIAL = "official"  # Government, court documents
    ACADEMIC = "academic"  # Peer-reviewed, scholarly
    REPUTABLE = "reputable"  # Major news outlets, established organizations
    GENERAL = "general"  # General websites, blogs
    UNVERIFIED = "unverified"  # Unknown or questionable sources

@dataclass
class LegalSource:
    """Represents a legal source with metadata"""
    url: str
    title: str
    content: str
    credibility: SourceCredibility
    # date_accessed: datetime
    # date_published: Optional[str] = None
    author: Optional[str] = None
    jurisdiction: Optional[str] = None
    citation: Optional[str] = None
    hash: Optional[str] = None

    def __post_init__(self):
        # Generate content hash for deduplication
        self.hash = hashlib.md5(self.content.encode()).hexdigest()[:8]

@dataclass
class EvidenceItem:
    """Represents a piece of evidence in the legal research"""
    claim: str
    supporting_sources: List[LegalSource]
    confidence_score: float  # 0-1
    reasoning: str
    contradictions: List[Dict[str, Any]] = field(default_factory=list)
    verification_status: str = "pending"

@dataclass
class LegalResearchResult:
    """Complete legal research result with chain of evidence"""
    query: str
    summary: str
    evidence_chain: List[EvidenceItem]
    legal_precedents: List[Dict[str, Any]]
    jurisdictional_notes: Dict[str, str]
    confidence_assessment: Dict[str, float]
    timestamp: datetime = field(default_factory=datetime.now)

In [None]:
# @title Implement Advanced Research Tools

@tool
def legal_document_search(query: str, jurisdiction: str = "INDIAN union and state Federal") -> str:
    """
    Search legal documents, cases, and statutes.
    Returns relevant legal information with citations.
    """
    try:
        # Search using Tavily with legal-focused query
        legal_query = f"legal {jurisdiction}jurisdiction {query}"
        results = tavily_client.search(
            query=legal_query,
            search_depth="advanced",
            max_results=5,
            include_domains=["https://indiankanoon.org/"],
        )

        formatted_results = []
        for r in results.get('results', []):
            formatted_results.append({
                'title': r.get('title'),
                'url': r.get('url'),
                'content': r.get('content'),
                'score': r.get('score', 0)
            })

        return json.dumps(formatted_results, indent=2)
    except Exception as e:
        return f"Error searching legal documents: {str(e)}"

@tool
def verify_legal_citation(citation: str) -> str:
    """
    Verify if a legal citation is valid and retrieve its details.
    Supports indian case law citations.
    """
    try:

        case_pattern = r'(\d+)\s+(\w+\.?\s*\d*[a-z]?)\s+(\d+)'
        match = re.search(case_pattern, citation)

        if match:
            # Search for the actual case
            search_results = tavily_client.search(
                query=f'"{citation}" legal case',
                max_results=5,
                include_domains=["https://indiankanoon.org/"],
            )

            if search_results.get('results'):
                case_info = {
                    'citation': citation,
                    'verified': True,
                    'sources': [r.get('url') for r in search_results['results']],
                    'summary': search_results['results'][0].get('content', '')[:200]
                }
                return json.dumps(case_info, indent=2)

        return json.dumps({'citation': citation, 'verified': False, 'error': 'Citation format not recognized'})
    except Exception as e:
        return f"Error verifying citation: {str(e)}"

@tool
def fact_check_legal_claim(claim: str, sources: List[str] = None) -> str:
    """
    Fact-check a legal claim against multiple sources.
    Returns verification status with supporting/contradicting evidence.
    """
    try:
        # Search for supporting and contradicting evidence
        supporting_query = f'"{claim}" legal true accurate'
        contradicting_query = f'"{claim}" legal false myth incorrect'

        supporting = tavily_client.search(supporting_query, max_results=5)
        contradicting = tavily_client.search(contradicting_query, max_results=3)

        result = {
            'claim': claim,
            'supporting_evidence': [
                {'source': r.get('url'), 'excerpt': r.get('content')[:150]}
                for r in supporting.get('results', [])
            ],
            'contradicting_evidence': [
                {'source': r.get('url'), 'excerpt': r.get('content')[:150]}
                for r in contradicting.get('results', [])
            ],
            'confidence': 'high' if len(supporting.get('results', [])) > len(contradicting.get('results', [])) else 'low'
        }

        return json.dumps(result, indent=2)
    except Exception as e:
        return f"Error fact-checking claim: {str(e)}"

@tool
def extract_legal_precedents(case_text: str) -> str:
    """
    Extract legal precedents and cited cases from text.
    """
    try:
        # Common patterns for case citations
        citation_patterns = [
            r'\d+\s+U\.S\.\s+\d+',  # US Reports
            r'\d+\s+S\.Ct\.\s+\d+',  # Supreme Court Reporter
            r'\d+\s+F\.\d+\s+\d+',  # Federal Reporter
            r'\d+\s+F\.Supp\.\s+\d+',  # Federal Supplement
        ]

        precedents = []
        for pattern in citation_patterns:
            matches = re.findall(pattern, case_text)
            precedents.extend(matches)

        # Remove duplicates and return
        unique_precedents = list(set(precedents))
        return json.dumps({
            'precedents_found': len(unique_precedents),
            'citations': unique_precedents[:10]  # Limit to 10 most relevant
        }, indent=2)
    except Exception as e:
        return f"Error extracting precedents: {str(e)}"

@tool
def analyze_source_credibility(url: str) -> str:
    """
    Analyze the credibility of a source based on domain, content, and other factors.
    """
    try:
        domain_credibility = {
            'law.cornell.edu': SourceCredibility.OFFICIAL,
            'justia.com': SourceCredibility.OFFICIAL,
            'courtlistener.com': SourceCredibility.OFFICIAL,
            'indiankanoon.org': SourceCredibility.OFFICIAL,
            # 'supremecourt.gov': SourceCredibility.OFFICIAL,
            # 'scholar.google.com': SourceCredibility.ACADEMIC,
            # 'harvard.edu': SourceCredibility.ACADEMIC,
            # 'yale.edu': SourceCredibility.ACADEMIC,
            # 'reuters.com': SourceCredibility.REPUTABLE,
            'apnews.com': SourceCredibility.REPUTABLE,
        }

        # Extract domain
        from urllib.parse import urlparse
        domain = urlparse(url).netloc.lower()

        # Check known domains
        for known_domain, cred in domain_credibility.items():
            if known_domain in domain:
                return json.dumps({
                    'url': url,
                    'domain': domain,
                    'credibility': cred.value,
                    'trusted': True
                })

        # Default assessment
        return json.dumps({
            'url': url,
            'domain': domain,
            'credibility': SourceCredibility.GENERAL.value,
            'trusted': False,
            'note': 'Unknown source - verify independently'
        })
    except Exception as e:
        return f"Error analyzing source: {str(e)}"

In [None]:
# @title Create the Advanced Legal Research Agent

class LegalResearchAgent:
    """Advanced agent for legal research with multi-step reasoning"""

    def __init__(self, llm, tools):
        self.llm = llm
        self.tools = tools
        self.memory = ConversationBufferWindowMemory(
            memory_key="chat_history",
            return_messages=True,
            k=10
        )

        # Create specialized prompt for legal research
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You are an expert legal research assistant with deep knowledge of law and legal procedures.

Your approach to legal research:
1. **Understand the Query**: Identify the legal question, jurisdiction, and relevant areas of law
2. **Gather Evidence**: Search for relevant cases, statutes, and legal documents
3. **Verify Sources**: Check credibility and authenticity of all sources
4. **Extract Precedents**: Identify relevant legal precedents and citations
5. **Fact-Check Claims**: Verify all legal claims against multiple sources
6. **Build Evidence Chain**: Create a logical chain of evidence supporting your conclusions
7. **Assess Confidence**: Evaluate the strength of your findings

For each research task:
- Always cite specific cases and statutes
- Verify all citations are real and accurate
- Note jurisdictional limitations
- Identify potential contradictions or conflicts
- Provide confidence assessments for your conclusions

Available tools:
{tools}

Use these tools to conduct thorough legal research before providing your final analysis."""),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])

        # Create the agent
        self.agent = create_react_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=self.prompt
        )

        self.executor = AgentExecutor(
            agent=self.agent,
            tools=self.tools,
            memory=self.memory,
            verbose=True,
            max_iterations=15,
            handle_parsing_errors=True,
            return_intermediate_steps=True
        )

    def research(self, query: str) -> LegalResearchResult:
        """Conduct comprehensive legal research"""
        try:
            # Execute the research
            result = self.executor.invoke({"input": query})

            # Parse the results into structured format
            return self._parse_research_results(query, result)
        except Exception as e:
            print(f"Error in legal research: {str(e)}")
            return LegalResearchResult(
                query=query,
                summary=f"Error conducting research: {str(e)}",
                evidence_chain=[],
                legal_precedents=[],
                jurisdictional_notes={},
                confidence_assessment={"overall": 0.0}
            )

    def _parse_research_results(self, query: str, raw_result: Dict) -> LegalResearchResult:
        """Parse agent results into structured legal research result"""
        # Extract key information from the agent's response
        output = raw_result.get('output', '')
        intermediate_steps = raw_result.get('intermediate_steps', [])

        # Build evidence chain from intermediate steps
        evidence_chain = []
        legal_precedents = []

        for action, observation in intermediate_steps:
            if hasattr(action, 'tool'):
                if action.tool == 'legal_document_search':
                    # Parse search results
                    try:
                        results = json.loads(observation)
                        for r in results:
                            evidence_chain.append(EvidenceItem(
                                claim=f"Found relevant document: {r.get('title', 'Unknown')}",
                                supporting_sources=[LegalSource(
                                    url=r.get('url', ''),
                                    title=r.get('title', ''),
                                    content=r.get('content', ''),
                                    credibility=SourceCredibility.GENERAL,
                                    date_accessed=datetime.now()
                                )],
                                confidence_score=r.get('score', 0.5),
                                reasoning="Direct search result"
                            ))
                    except:
                        pass

                elif action.tool == 'extract_legal_precedents':
                    # Parse precedents
                    try:
                        precedents_data = json.loads(observation)
                        legal_precedents.extend(precedents_data.get('citations', []))
                    except:
                        pass

        # Create final result
        return LegalResearchResult(
            query=query,
            summary=output,
            evidence_chain=evidence_chain,
            legal_precedents=[{'citation': p, 'verified': True} for p in legal_precedents],
            jurisdictional_notes={'primary': 'US Federal', 'limitations': 'Results may vary by state'},
            confidence_assessment={'overall': 0.8, 'source_quality': 0.9}
        )

In [None]:
# @title Implement Multi-Step Fact-Checking Process

class LegalFactChecker:
    """Multi-step fact-checking system for legal claims"""

    def __init__(self, llm, tools):
        self.llm = llm
        self.tools = tools
        self.verification_steps = [
            self._initial_claim_analysis,
            self._source_verification,
            self._cross_reference_check,
            self._contradiction_analysis,
            self._final_verification
        ]

    def _initial_claim_analysis(self, claim: str) -> Dict[str, Any]:
        """Step 1: Analyze the claim structure and identify key elements"""
        prompt = f"""Analyze this legal claim and identify:
        1. Main legal assertion
        2. Jurisdiction mentioned
        3. Legal concepts involved
        4. Specific facts claimed

        Claim: {claim}
        """

        # Use LLM to analyze the claim
        response = self.llm.invoke(prompt)
        return {
            "step": "initial_analysis",
            "claim": claim,
            "analysis": response.content,
            "timestamp": datetime.now()
        }

    def _source_verification(self, current_data: Dict) -> Dict[str, Any]:
        """Step 2: Verify sources for the claim"""
        claim = current_data.get('claim')
        if not claim:
            return {"step": "source_verification", "error": "Claim not found in data", "timestamp": datetime.now()}

        # Use the fact_check_legal_claim tool
        # Pass the claim as a string, not the whole current_data dictionary
        result_json_str = fact_check_legal_claim.invoke(claim)
        try:
            result = json.loads(result_json_str)
        except json.JSONDecodeError as e:
            return {"step": "source_verification", "error": f"Failed to parse fact_check_legal_claim output: {e}", "raw_output": result_json_str, "timestamp": datetime.now()}

        return {
            "step": "source_verification",
            "sources_found": result,
            "timestamp": datetime.now()
        }

    def _cross_reference_check(self, current_data: Dict) -> Dict[str, Any]:
        """Step 3: Cross-reference multiple sources"""
        sources_data = current_data.get('sources_found')
        if not sources_data:
             return {"step": "cross_reference", "error": "Sources data not found", "timestamp": datetime.now()}

        credible_sources_list = []
        supporting_evidence = sources_data.get('supporting_evidence', [])

        for source_item in supporting_evidence:
            source_url = source_item.get('source')
            if source_url:
                try:
                    cred_check_json_str = analyze_source_credibility.invoke(source_url)
                    cred_check = json.loads(cred_check_json_str)
                    credible_sources_list.append(cred_check)
                except json.JSONDecodeError as e:
                     credible_sources_list.append({"url": source_url, "credibility": "parse_error", "error": str(e)})
                except Exception as e:
                     credible_sources_list.append({"url": source_url, "credibility": "tool_error", "error": str(e)})


        return {
            "step": "cross_reference",
            "credible_sources": credible_sources_list,
            "timestamp": datetime.now()
        }

    def _contradiction_analysis(self, current_data: Dict) -> Dict[str, Any]:
        """Step 4: Analyze contradictions"""
        sources_data = current_data.get('sources_found')
        if not sources_data:
            return {"step": "contradiction_analysis", "error": "Sources data not found for contradiction analysis", "timestamp": datetime.now()}

        contradictions = sources_data.get('contradicting_evidence', [])

        return {
            "step": "contradiction_analysis",
            "contradictions_found": len(contradictions),
            "details": contradictions,
            "timestamp": datetime.now()
        }

    def _final_verification(self, all_steps: List[Dict]) -> Dict[str, Any]:
        """Step 5: Final verification and confidence scoring"""
        supporting_count = 0
        contradiction_count = 0
        errors_in_steps = 0
        credible_sources_count = 0

        for step_data in all_steps:
            if step_data.get('error'):
                errors_in_steps += 1
            elif step_data.get('step') == 'source_verification':
                sources_data = step_data.get('sources_found', {})
                supporting_count += len(sources_data.get('supporting_evidence', []))
                contradiction_count += len(sources_data.get('contradicting_evidence', []))
            elif step_data.get('step') == 'cross_reference':
                 credible_sources_count += len([cs for cs in step_data.get('credible_sources', []) if cs.get('trusted', False) or cs.get('credibility') in ['official', 'academic', 'reputable']])


        # Calculate confidence
        total_evidence = supporting_count + contradiction_count
        if total_evidence == 0:
             confidence = 0.5 if errors_in_steps == 0 else 0.0 # Neutral if no evidence but no errors, low if errors
        else:
            # Factor in credible sources count for a more nuanced confidence score
            confidence = max(0, min(1, (supporting_count + credible_sources_count - contradiction_count) / (total_evidence + credible_sources_count)))

        return {
            "step": "final_verification",
            "confidence_score": confidence,
            "verification_complete": True,
            "timestamp": datetime.now()
        }


    def verify_claim(self, claim: str) -> Dict[str, Any]:
        """Execute multi-step fact-checking process"""
        results = []
        current_data: Dict[str, Any] = {"claim": claim} # Initialize current_data with the claim

        for step_func in self.verification_steps:
            try:
                # Pass current_data to each step function
                step_result = step_func(current_data)
                results.append(step_result)
                # Update current_data with the results of the current step
                current_data.update(step_result)
            except Exception as e:
                results.append({
                    "step": step_func.__name__,
                    "error": str(e),
                    "timestamp": datetime.now()
                })
                # If a step fails, we might want to stop or continue with partial data
                # For now, let's continue to see subsequent step errors.
                # Depending on the error, current_data might be incomplete for the next step.


        # Pass the full list of step results to the final verification step
        final_result = self._final_verification(results)
        results.append(final_result) # Add the final verification step result


        return {
            "claim": claim,
            "verification_steps": results,
            "final_confidence": final_result.get('confidence_score', 0),
            "timestamp": datetime.now()
        }

In [None]:
# @title Collect all defined tools
# Make sure these functions are available in the global scope from your previous code blocks
# They were defined using the @tool decorator which automatically registers them as callables.
all_tools = [
    legal_document_search,
    verify_legal_citation,
    fact_check_legal_claim,
    extract_legal_precedents,
    analyze_source_credibility
]

# @title Instantiate and Run the Legal Fact-Checker

print("Initializing Legal Fact-Checker...")
# Re-using the 'llm' and 'all_tools' defined in previous sections
fact_checker = LegalFactChecker(llm=llm, tools=all_tools)
print("Legal Fact-Checker initialized.")

# --- Define a sample legal claim to fact-check ---
# Example 1: A claim that is likely true (or easily verifiable)
claim_1 = "In the United States, a contract requires an offer, acceptance, and consideration to be legally binding."
# Example 2: A more nuanced or potentially disputable claim
claim_2 = "A non-compete clause in an employment contract is generally unenforceable in California if it restricts an employee's ability to practice a lawful profession, trade, or business."
# Example 3: A claim that might have contradictions or require deeper analysis
claim_3 = "All verbal agreements for real estate sales are unenforceable in all U.S. states."

print(f"\n--- Fact-Checking Claim: {claim_2} ---")
verification_result = fact_checker.verify_claim(claim_2)

print("\n\n" + "="*80)
print("✅ FACT-CHECKING REPORT COMPLETE ✅")
print("="*80)
print(f"\n**Claim:** {verification_result['claim']}\n")
print(f"**Final Confidence:** {verification_result['final_confidence']:.2f}\n")
print("-" * 30 + " Detailed Steps " + "-" * 30)

# Store cross-references and citations found during verification
cross_references = []
citations = []

for step_data in verification_result['verification_steps']:
    step_name = step_data.get('step', 'Unknown Step').replace('_', ' ').title()
    timestamp = step_data.get('timestamp', datetime.now()).strftime('%Y-%m-%d %H:%M:%S')

    print(f"\n### {step_name} (at {timestamp})")

    if step_data.get('error'):
        print(f"  ❌ Error: {step_data['error']}")
    elif step_data.get('analysis'):
        print(f"  **Analysis:**\n{step_data['analysis']}")
    elif step_data.get('sources_found'):
        sources = step_data['sources_found']
        print(f"  **Verification Status:** {sources.get('confidence', 'N/A')}")
        print("  **Supporting Evidence:**")
        for s in sources.get('supporting_evidence', []):
            print(f"    - URL: {s.get('source')} (Excerpt: {s.get('excerpt')})")
            cross_references.append(s.get('source'))
        print("  **Contradicting Evidence:**")
        for c in sources.get('contradicting_evidence', []):
            print(f"    - URL: {c.get('source')} (Excerpt: {c.get('excerpt')})")
            cross_references.append(c.get('source'))
    elif step_data.get('credible_sources'):
        print("  **Cross-Referenced Credible Sources:**")
        for cs in step_data['credible_sources']:
            print(f"    - URL: {cs.get('url')} (Credibility: {cs.get('credibility')})")
            cross_references.append(cs.get('url'))
    elif step_data.get('contradictions_found') is not None:
        print(f"  **Contradictions Found:** {step_data['contradictions_found']}")
        for d in step_data.get('details', []):
            print(f"    - URL: {d.get('source')} (Excerpt: {d.get('excerpt')})")
            cross_references.append(d.get('source'))
    elif step_data.get('verification_complete'):
        print(f"  **Confidence Score:** {step_data['confidence_score']:.2f}")
        print("  Verification process concluded.")

print("\n" + "="*80)

# Display Cross-References and Citations at the end
print("\n" + "="*80)
print("📚 Cross-References and Citations 📚")
print("="*80)

if cross_references:
    print("\n**Cross-References (URLs):**")
    # Remove duplicates and print
    for url in sorted(list(set(cross_references))):
        print(f"- {url}")
else:
    print("\nNo significant cross-references found during verification.")

if citations:
    print("\n**Citations:**")
    # Remove duplicates and print
    for citation in sorted(list(set(citations))):
        print(f"- {citation}")
else:
    print("\nNo specific legal citations found during verification steps.")

print("\n" + "="*80)

# Optional: You can run another claim
# print(f"\n--- Fact-Checking Claim: {claim_3} ---")
# verification_result_2 = fact_checker.verify_claim(claim_3)
# # ... (display verification_result_2 in a similar format)