In [1]:
!pip install torch fastapi uvicorn spacy pdfplumber moviepy librosa soundfile matplotlib numpy json tempfile transformers sentence-transformers pyngrok
!pip install openai-whisper
!pip install accelerate
!pip install pydantic==1.10.8  # Downgrade if needed for FastAPI compatibility


Collecting fastapi
  Downloading fastapi-0.115.11-py3-none-any.whl.metadata (27 kB)
Collecting uvicorn
  Downloading uvicorn-0.34.0-py3-none-any.whl.metadata (6.5 kB)
Collecting pdfplumber
  Downloading pdfplumber-0.11.5-py3-none-any.whl.metadata (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.5/42.5 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[31mERROR: Could not find a version that satisfies the requirement json (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for json[0m[31m
[0mCollecting openai-whisper
  Downloading openai-whisper-20240930.tar.gz (800 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m800.5/800.5 kB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting tiktoken (from openai-whisper)
  Downloading tiktoken-0.9.0-c

In [2]:
!python -m spacy download en_core_web_sm


Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m65.8 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [3]:
!pip install pyngrok


Collecting pyngrok
  Downloading pyngrok-7.2.3-py3-none-any.whl.metadata (8.7 kB)
Downloading pyngrok-7.2.3-py3-none-any.whl (23 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.2.3


In [4]:
!ngrok authtoken 2tcXhKrQ46IoUz9QKx82ppnTLT5_27c6sMjdwa2KPAAPGyyMx

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [11]:
!pip install uvicorn


Collecting uvicorn
  Using cached uvicorn-0.34.0-py3-none-any.whl.metadata (6.5 kB)
Downloading uvicorn-0.34.0-py3-none-any.whl (62 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/62.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.3/62.3 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: uvicorn
Successfully installed uvicorn-0.34.0


In [12]:
!pip install pdfplumber




In [13]:
!pip install fastapi




In [15]:
!pip install python-multipart




In [16]:
%%writefile app.py

import os
import io
import torch
import uvicorn
import spacy
import pdfplumber
import moviepy.editor as mp
import librosa
import soundfile as sf
import matplotlib.pyplot as plt
import numpy as np
import json
import tempfile
from fastapi import FastAPI, UploadFile, File, HTTPException, Form
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer
from sentence_transformers import SentenceTransformer
from pyngrok import ngrok
from threading import Thread
import time
import uuid

# ✅ Ensure compatibility with Google Colab
try:
    from google.colab import drive
    drive.mount('/content/drive')
except:
    pass  # Skip drive mount if not in Google Colab

# ✅ Ensure required directories exist
os.makedirs("static", exist_ok=True)
os.makedirs("temp", exist_ok=True)

# ✅ Ensure GPU usage
device = "cuda" if torch.cuda.is_available() else "cpu"

# ✅ Initialize FastAPI
app = FastAPI(title="Legal Document and Video Analyzer")

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ✅ Initialize document storage
document_storage = {}
chat_history = []  # ✅ Added global chat history

# ✅ Function to store document context by task ID
def store_document_context(task_id, text):
    """Store document text for retrieval by chatbot."""
    document_storage[task_id] = text
    return True

# ✅ Function to load document context by task ID
def load_document_context(task_id):
    """Retrieve document text for chatbot context."""
    return document_storage.get(task_id, "")

# ✅ Load NLP Models
try:
    # Download spacy model if not available
    try:
        nlp = spacy.load("en_core_web_sm")
    except:
        spacy.cli.download("en_core_web_sm")
        nlp = spacy.load("en_core_web_sm")

    print("✅ Loading NLP models...")

    # Use device_map="auto" for better GPU utilization
    summarizer = pipeline("summarization", model="nsi319/legal-pegasus",
                      device=0 if torch.cuda.is_available() else -1)

    embedding_model = SentenceTransformer("all-mpnet-base-v2", device=device)

    ner_model = pipeline("ner", model="dslim/bert-base-NER",
                     device=0 if torch.cuda.is_available() else -1)

    # For video analysis - use a more robust speech recognition model
    speech_to_text = pipeline("automatic-speech-recognition",
                             model="openai/whisper-medium",
                             chunk_length_s=30,
                             device_map="auto" if torch.cuda.is_available() else "cpu")

    # ✅ Load CUAD Clause Classification Model
    cuad_tokenizer = AutoTokenizer.from_pretrained("nlpaueb/legal-bert-base-uncased")
    cuad_model = AutoModelForSequenceClassification.from_pretrained("nlpaueb/legal-bert-base-uncased")
    cuad_model.to(device)

    print("✅ All models loaded successfully")

except Exception as e:
    print(f"⚠️ Error loading models: {str(e)}")
    raise RuntimeError(f"Error loading models: {str(e)}")

from transformers import pipeline

# ✅ Load a Question Answering Model
qa_model = pipeline("question-answering", model="deepset/roberta-base-squad2")

def legal_chatbot(user_input, context):
    """Uses a real NLP model for legal Q&A."""
    global chat_history  # ✅ Use global chat history

    # Append user question to chat history
    chat_history.append({"role": "user", "content": user_input})

    # Get the AI-based response
    response = qa_model(question=user_input, context=context)["answer"]

    # Append response to chat history
    chat_history.append({"role": "assistant", "content": response})
    return response

# ✅ PDF Text Extraction
def extract_text_from_pdf(pdf_file):
    """Extracts text from a PDF file using pdfplumber."""
    try:
        with pdfplumber.open(pdf_file) as pdf:
            text = "\n".join([page.extract_text() or "" for page in pdf.pages])
        return text.strip() if text else None
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"PDF extraction failed: {str(e)}")

# ✅ Video Processing - Extract Audio and Transcribe
def process_video_to_text(video_file_path):
    """Extract audio from video and convert to text."""
    try:
        print(f"Processing video file at {video_file_path}")

        # Create temp directory for audio extraction
        temp_audio_path = os.path.join("temp", "extracted_audio.wav")

        # Extract audio from video
        video = mp.VideoFileClip(video_file_path)
        video.audio.write_audiofile(temp_audio_path, codec='pcm_s16le')

        print(f"Audio extracted to {temp_audio_path}")

        # Load audio for transcription
        # Process in chunks to handle large files
        full_transcript = ""

        # Process using whisper pipeline
        result = speech_to_text(temp_audio_path)
        transcript = result["text"]

        print(f"Transcription completed: {len(transcript)} characters")

        # Clean up
        if os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)

        return transcript
    except Exception as e:
        print(f"Error in video processing: {str(e)}")
        raise HTTPException(status_code=400, detail=f"Video processing failed: {str(e)}")

# ✅ Audio Processing - Direct Audio Transcription
def process_audio_to_text(audio_file_path):
    """Process audio file and convert to text."""
    try:
        print(f"Processing audio file at {audio_file_path}")

        # Process using whisper pipeline
        result = speech_to_text(audio_file_path)
        transcript = result["text"]

        print(f"Transcription completed: {len(transcript)} characters")

        return transcript
    except Exception as e:
        print(f"Error in audio processing: {str(e)}")
        raise HTTPException(status_code=400, detail=f"Audio processing failed: {str(e)}")

# ✅ Named Entity Recognition (NER)
def extract_named_entities(text):
    """Extracts named entities from legal text."""
    # Process text in chunks to avoid memory issues
    max_length = 10000  # Process 10K characters at a time
    entities = []

    for i in range(0, len(text), max_length):
        chunk = text[i:i+max_length]
        doc = nlp(chunk)
        entities.extend([{"entity": ent.text, "label": ent.label_} for ent in doc.ents])

    return entities

# ✅ Legal Risk Assessment (Original Function)
def analyze_risk(text):
    """Analyzes legal risk in the document using keyword-based analysis."""
    risk_keywords = {
        "Liability": ["liability", "responsible", "responsibility", "legal obligation"],
        "Termination": ["termination", "breach", "contract end", "default"],
        "Indemnification": ["indemnification", "indemnify", "hold harmless", "compensate", "compensation"],
        "Payment Risk": ["payment", "terms", "reimbursement", "fee", "schedule", "invoice", "money"],
        "Insurance": ["insurance", "coverage", "policy", "claims"],
    }
    risk_scores = {category: 0 for category in risk_keywords}
    lower_text = text.lower()
    for category, keywords in risk_keywords.items():
        for keyword in keywords:
            risk_scores[category] += lower_text.count(keyword.lower())
    return risk_scores

# ✅ New: Contextual Extraction for Risk Terms
def extract_context_for_risk_terms(text, risk_keywords, window=1):
    """
    Extracts and summarizes the context around risk terms.

    Parameters:
        text (str): The full text of the document.
        risk_keywords (dict): A dictionary with risk categories and their keyword lists.
        window (int): Number of sentences before and after the risk occurrence to include.

    Returns:
        dict: Mapping risk categories to their summarized contextual details.
    """
    # Process the text with spaCy for sentence segmentation
    doc = nlp(text)
    sentences = list(doc.sents)

    # Dictionary to collect contexts for each risk category
    risk_contexts = {category: [] for category in risk_keywords}

    # Loop over sentences and find risk term occurrences
    for i, sent in enumerate(sentences):
        sent_text_lower = sent.text.lower()
        for category, details in risk_keywords.items():
            for keyword in details["keywords"]:
                if keyword.lower() in sent_text_lower:
                    # Define a window around the current sentence
                    start_idx = max(0, i - window)
                    end_idx = min(len(sentences), i + window + 1)
                    context_chunk = " ".join([s.text for s in sentences[start_idx:end_idx]])
                    risk_contexts[category].append(context_chunk)

    # Summarize the collected contexts for each risk category using the summarizer model
    summarized_contexts = {}
    for category, contexts in risk_contexts.items():
        if contexts:
            combined_context = " ".join(contexts)
            try:
                summary_result = summarizer(combined_context, max_length=100, min_length=30, do_sample=False)
                summary = summary_result[0]['summary_text']
            except Exception as e:
                summary = "Context summarization failed."
            summarized_contexts[category] = summary
        else:
            summarized_contexts[category] = "No contextual details found."

    return summarized_contexts

def get_detailed_risk_info(text):
    """
    Returns detailed risk information by merging risk scores with descriptive details
    and contextual summaries from the document.
    """
    # Detailed risk information dictionary
    risk_details = {
        "Liability": {
            "description": "Liability refers to the legal responsibility for losses or damages.",
            "common_concerns": "Broad liability clauses may expose parties to unforeseen risks.",
            "recommendations": "Review and negotiate clear limits on liability.",
            "example": "E.g., 'The party shall be liable for direct damages due to negligence.'"
        },
        "Termination": {
            "description": "Termination involves conditions under which a contract can be ended.",
            "common_concerns": "Unilateral termination rights or ambiguous conditions can be risky.",
            "recommendations": "Ensure termination clauses are balanced and include notice periods.",
            "example": "E.g., 'Either party may terminate the agreement with 30 days notice.'"
        },
        "Indemnification": {
            "description": "Indemnification requires one party to compensate for losses incurred by the other.",
            "common_concerns": "Overly broad indemnification can shift significant risk.",
            "recommendations": "Negotiate clear limits and carve-outs where necessary.",
            "example": "E.g., 'The seller shall indemnify the buyer against claims from product defects.'"
        },
        "Payment Risk": {
            "description": "Payment risk pertains to terms regarding fees, schedules, and reimbursements.",
            "common_concerns": "Vague payment terms or hidden charges increase risk.",
            "recommendations": "Clarify payment conditions and include penalties for delays.",
            "example": "E.g., 'Payments must be made within 30 days, with a 2% late fee thereafter.'"
        },
        "Insurance": {
            "description": "Insurance risk covers the adequacy and scope of required coverage.",
            "common_concerns": "Insufficient insurance can leave parties exposed in unexpected events.",
            "recommendations": "Review insurance requirements to ensure they meet the risk profile.",
            "example": "E.g., 'The contractor must maintain liability insurance with at least $1M coverage.'"
        }
    }

    # Get basic risk scores from the original function
    risk_scores = analyze_risk(text)

    # Define risk keywords for context extraction
    risk_keywords_context = {
        "Liability": {"keywords": ["liability", "responsible", "responsibility", "legal obligation"]},
        "Termination": {"keywords": ["termination", "breach", "contract end", "default"]},
        "Indemnification": {"keywords": ["indemnification", "indemnify", "hold harmless", "compensate", "compensation"]},
        "Payment Risk": {"keywords": ["payment", "terms", "reimbursement", "fee", "schedule", "invoice", "money"]},
        "Insurance": {"keywords": ["insurance", "coverage", "policy", "claims"]}
    }

    # Extract summarized contextual details for each risk term
    risk_contexts = extract_context_for_risk_terms(text, risk_keywords_context, window=1)

    detailed_info = {}
    for risk_term, score in risk_scores.items():
        if score > 0:
            info = risk_details.get(risk_term, {"description": "No details available."})
            detailed_info[risk_term] = {
                "score": score,
                "description": info.get("description", ""),
                "common_concerns": info.get("common_concerns", ""),
                "recommendations": info.get("recommendations", ""),
                "example": info.get("example", ""),
                "context_summary": risk_contexts.get(risk_term, "No context available.")
            }
    return detailed_info

# ✅ Clause Classification (CUAD)
def analyze_contract_clauses(text):
    """Analyzes contract clauses using CUAD (Contract Understanding Atticus Dataset)."""
    # Process text in chunks to handle large documents
    max_length = 512
    step = 256  # Overlap to catch clauses that might span chunk boundaries
    clauses_detected = []

    clause_types = [
        "Obligations of Seller", "Governing Law", "Termination", "Indemnification",
        "Confidentiality", "Insurance", "Non-Compete", "Change of Control",
        "Assignment", "Warranty", "Limitation of Liability", "Arbitration",
        "IP Rights", "Force Majeure", "Revenue/Profit Sharing", "Audit Rights"
    ]

    # Split text into manageable chunks
    chunks = [text[i:i+max_length] for i in range(0, len(text), step) if i+step < len(text)]

    # Process each chunk
    for chunk in chunks:
        inputs = cuad_tokenizer(chunk, return_tensors="pt", truncation=True, max_length=512).to(device)
        with torch.no_grad():
            outputs = cuad_model(**inputs)

        predictions = torch.sigmoid(outputs.logits).cpu().numpy()[0]

        for idx, confidence in enumerate(predictions):
            if confidence > 0.5 and idx < len(clause_types):
                clauses_detected.append({"type": clause_types[idx], "confidence": float(confidence)})

    # Aggregate duplicate clauses by taking highest confidence
    aggregated_clauses = {}
    for clause in clauses_detected:
        clause_type = clause["type"]
        if clause_type not in aggregated_clauses or clause["confidence"] > aggregated_clauses[clause_type]["confidence"]:
            aggregated_clauses[clause_type] = clause

    return list(aggregated_clauses.values())

# ✅ Legal Document Analysis API
@app.post("/analyze_legal_document")
async def analyze_legal_document(file: UploadFile = File(...)):
    """Analyzes a legal document for clause detection and compliance risks."""
    try:
        print(f"Processing file: {file.filename}")
        content = await file.read()
        text = extract_text_from_pdf(io.BytesIO(content))

        if not text:
            return {"status": "error", "message": "No valid text found in the document."}

        # Truncate text for summarization if too long
        summary_text = text[:4096] if len(text) > 4096 else text
        summary = summarizer(summary_text, max_length=200, min_length=50, do_sample=False)[0]['summary_text'] if len(text) > 100 else "Document too short for meaningful summarization."

        print("Extracting named entities...")
        entities = extract_named_entities(text)

        print("Analyzing risk...")
        risk_scores = analyze_risk(text)
        # Get detailed risk info including contextual summaries
        detailed_risk = get_detailed_risk_info(text)

        print("Analyzing contract clauses...")
        clauses = analyze_contract_clauses(text)

        # ✅ Generate a unique Task ID (Properly Indented)
        generated_task_id = str(uuid.uuid4())

        # ✅ Store document text for chatbot context
        store_document_context(generated_task_id, text)

        return {  # ✅ Ensure proper indentation
            "status": "success",
            "task_id": generated_task_id,  # ✅ Ensure Task ID is included
            "summary": summary,
            "named_entities": entities,
            "risk_scores": risk_scores,
            "detailed_risk": detailed_risk,
            "clauses_detected": clauses
        }

    except Exception as e:
        print(f"Error processing document: {str(e)}")
        return {"status": "error", "message": str(e)}

# ✅ NEW: Legal Video Analysis API
@app.post("/analyze_legal_video")
async def analyze_legal_video(file: UploadFile = File(...)):
    """Analyzes a legal video by transcribing audio and analyzing the transcript."""
    try:
        # Save the uploaded file to a temporary location
        print(f"Processing video file: {file.filename}")
        content = await file.read()

        with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file:
            temp_file.write(content)
            temp_file_path = temp_file.name

        print(f"Temporary file saved at: {temp_file_path}")

        # Extract text from video via audio transcription
        text = process_video_to_text(temp_file_path)

        # Clean up temporary file
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

        if not text:
            return {"status": "error", "message": "No speech could be transcribed from the video."}

        # Store the transcript for reference
        transcript_path = os.path.join("static", f"transcript_{int(time.time())}.txt")
        with open(transcript_path, "w") as f:
            f.write(text)

        # Analyze the transcribed text (same as document analysis)
        summary_text = text[:4096] if len(text) > 4096 else text
        summary = summarizer(summary_text, max_length=200, min_length=50, do_sample=False)[0]['summary_text'] if len(text) > 100 else "Transcript too short for meaningful summarization."

        print("Extracting named entities from transcript...")
        entities = extract_named_entities(text)

        print("Analyzing risk from transcript...")
        risk_scores = analyze_risk(text)
        detailed_risk = get_detailed_risk_info(text)

        print("Analyzing legal clauses from transcript...")
        clauses = analyze_contract_clauses(text)

        # ✅ Generate a unique Task ID for video transcripts
        generated_task_id = str(uuid.uuid4())

        # ✅ Store transcript text for chatbot context
        store_document_context(generated_task_id, text)

        return {
            "status": "success",
            "task_id": generated_task_id,
            "transcript": text,
            "transcript_path": transcript_path,
            "summary": summary,
            "named_entities": entities,
            "risk_scores": risk_scores,
            "detailed_risk": detailed_risk,
            "clauses_detected": clauses
        }
    except Exception as e:
        print(f"Error processing video: {str(e)}")
        return {"status": "error", "message": str(e)}

# ✅ NEW: Legal Audio Analysis API
@app.post("/analyze_legal_audio")
async def analyze_legal_audio(file: UploadFile = File(...)):
    """Analyzes legal audio by transcribing and analyzing the transcript."""
    try:
        # Save the uploaded file to a temporary location
        print(f"Processing audio file: {file.filename}")
        content = await file.read()

        with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file:
            temp_file.write(content)
            temp_file_path = temp_file.name

        print(f"Temporary file saved at: {temp_file_path}")

        # Extract text from audio via transcription
        text = process_audio_to_text(temp_file_path)

        # Clean up temporary file
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

        if not text:
            return {"status": "error", "message": "No speech could be transcribed from the audio."}

        # Store the transcript for reference
        transcript_path = os.path.join("static", f"transcript_{int(time.time())}.txt")
        with open(transcript_path, "w") as f:
            f.write(text)

        # Analyze the transcribed text (same as document analysis)
        summary_text = text[:4096] if len(text) > 4096 else text
        summary = summarizer(summary_text, max_length=200, min_length=50, do_sample=False)[0]['summary_text'] if len(text) > 100 else "Transcript too short for meaningful summarization."

        print("Extracting named entities from transcript...")
        entities = extract_named_entities(text)

        print("Analyzing risk from transcript...")
        risk_scores = analyze_risk(text)
        detailed_risk = get_detailed_risk_info(text)

        print("Analyzing legal clauses from transcript...")
        clauses = analyze_contract_clauses(text)

        # ✅ Generate a unique Task ID for audio transcripts
        generated_task_id = str(uuid.uuid4())

        # ✅ Store transcript text for chatbot context
        store_document_context(generated_task_id, text)

        return {
            "status": "success",
            "task_id": generated_task_id,
            "transcript": text,
            "transcript_path": transcript_path,
            "summary": summary,
            "named_entities": entities,
            "risk_scores": risk_scores,
            "detailed_risk": detailed_risk,
            "clauses_detected": clauses
        }
    except Exception as e:
        print(f"Error processing audio: {str(e)}")
        return {"status": "error", "message": str(e)}

# ✅ Get Transcript API
@app.get("/transcript/{transcript_id}")
async def get_transcript(transcript_id: str):
    """Retrieves a previously generated transcript."""
    transcript_path = os.path.join("static", f"transcript_{transcript_id}.txt")
    if os.path.exists(transcript_path):
        return FileResponse(transcript_path)
    else:
        raise HTTPException(status_code=404, detail="Transcript not found")

# ✅ Fixed legal chatbot API
@app.post("/legal_chatbot")
async def legal_chatbot_api(query: str = Form(...), task_id: str = Form(...)):
    """Handles legal Q&A using chat history and document context."""

    # Retrieve the document text from storage
    document_context = load_document_context(task_id)

    if not document_context:
        return {"response": "⚠️ No relevant document found for this task ID."}

    response = legal_chatbot(query, document_context)

    return {"response": response, "chat_history": chat_history[-5:]}

# ✅ Health Check Endpoint
@app.get("/health")
async def health_check():
    return {
        "status": "ok",
        "models_loaded": True,
        "device": device,
        "gpu_available": torch.cuda.is_available(),
        "timestamp": time.time()
    }

# ✅ Ngrok Setup for Google Colab
def setup_ngrok():
    """Sets up ngrok tunnel for Google Colab."""
    try:
        # Get auth token from environment or set it manually if you have one
        auth_token = os.environ.get("NGROK_AUTH_TOKEN")
        if auth_token:
            ngrok.set_auth_token(auth_token)

        ngrok.kill()  # Kill any existing tunnels
        time.sleep(1)  # Wait before reconnecting

        # Connect to ngrok
        ngrok_tunnel = ngrok.connect(8500, "http")
        public_url = ngrok_tunnel.public_url
        print(f"✅ Ngrok Public URL: {public_url}")

        # Keep ngrok connection alive
        def keep_alive():
            while True:
                time.sleep(60)
                try:
                    # Check tunnel status
                    tunnels = ngrok.get_tunnels()
                    if not tunnels:
                        print("⚠️ Ngrok tunnel closed. Reconnecting...")
                        ngrok_tunnel = ngrok.connect(8500, "http")
                        print(f"✅ Reconnected. New URL: {ngrok_tunnel.public_url}")
                except Exception as e:
                    print(f"⚠️ Ngrok error: {e}")

        Thread(target=keep_alive, daemon=True).start()
        return public_url
    except Exception as e:
        print(f"⚠️ Ngrok setup error: {e}")
        return None

from fastapi.responses import FileResponse

# ✅ Existing Risk Chart (Bar Chart) Endpoint
@app.get("/download_risk_chart")
async def download_risk_chart():
    """Generate and return a risk assessment chart as an image file."""
    try:
        # Ensure the static directory exists
        os.makedirs("static", exist_ok=True)

        # Sample risk assessment data (should be dynamically fetched from API results)
        risk_scores = {
            "Liability": 11,
            "Termination": 12,
            "Indemnification": 10,
            "Payment Risk": 41,
            "Insurance": 71
        }

        # Generate a bar chart for legal risk assessment
        plt.figure(figsize=(8, 5))
        plt.bar(risk_scores.keys(), risk_scores.values(), color='red')
        plt.xlabel("Risk Categories")
        plt.ylabel("Risk Score")
        plt.title("Legal Risk Assessment")
        plt.xticks(rotation=30)

        # Save the chart as an image file
        risk_chart_path = "static/risk_chart.png"
        plt.savefig(risk_chart_path)
        plt.close()

        return FileResponse(risk_chart_path, media_type="image/png", filename="risk_chart.png")

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error generating risk chart: {str(e)}")

# ✅ Additional Visualization Endpoints

@app.get("/download_risk_pie_chart")
async def download_risk_pie_chart():
    try:
        # Sample risk assessment data
        risk_scores = {
            "Liability": 11,
            "Termination": 12,
            "Indemnification": 10,
            "Payment Risk": 41,
            "Insurance": 71
        }

        # Generate a pie chart
        plt.figure(figsize=(6, 6))
        plt.pie(risk_scores.values(), labels=risk_scores.keys(), autopct='%1.1f%%', startangle=90)
        plt.title("Legal Risk Distribution")
        pie_chart_path = "static/risk_pie_chart.png"
        plt.savefig(pie_chart_path)
        plt.close()
        return FileResponse(pie_chart_path, media_type="image/png", filename="risk_pie_chart.png")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error generating pie chart: {str(e)}")

@app.get("/download_risk_radar_chart")
async def download_risk_radar_chart():
    try:
        risk_scores = {
            "Liability": 11,
            "Termination": 12,
            "Indemnification": 10,
            "Payment Risk": 41,
            "Insurance": 71
        }
        categories = list(risk_scores.keys())
        values = list(risk_scores.values())

        # Radar chart requires the data to wrap around
        categories += categories[:1]
        values += values[:1]

        angles = np.linspace(0, 2 * np.pi, len(categories), endpoint=False).tolist()
        angles += angles[:1]

        fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True))
        ax.plot(angles, values, 'o-', linewidth=2)
        ax.fill(angles, values, alpha=0.25)
        ax.set_thetagrids(np.degrees(angles[:-1]), categories)
        ax.set_title("Legal Risk Radar Chart", y=1.1)
        radar_chart_path = "static/risk_radar_chart.png"
        plt.savefig(radar_chart_path)
        plt.close()
        return FileResponse(radar_chart_path, media_type="image/png", filename="risk_radar_chart.png")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error generating radar chart: {str(e)}")

@app.get("/download_risk_trend_chart")
async def download_risk_trend_chart():
    try:
        # Sample historical risk scores for demonstration
        dates = ["2025-01-01", "2025-02-01", "2025-03-01", "2025-04-01"]
        risk_history = {
            "Liability": [10, 12, 11, 13],
            "Termination": [12, 15, 14, 13],
            "Indemnification": [9, 10, 11, 10],
            "Payment Risk": [40, 42, 41, 43],
            "Insurance": [70, 69, 71, 72]
        }

        plt.figure(figsize=(10, 6))
        for category, scores in risk_history.items():
            plt.plot(dates, scores, marker='o', label=category)
        plt.xlabel("Date")
        plt.ylabel("Risk Score")
        plt.title("Historical Legal Risk Trends")
        plt.xticks(rotation=45)
        plt.legend()
        trend_chart_path = "static/risk_trend_chart.png"
        plt.savefig(trend_chart_path, bbox_inches="tight")
        plt.close()
        return FileResponse(trend_chart_path, media_type="image/png", filename="risk_trend_chart.png")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error generating trend chart: {str(e)}")

import pandas as pd
import plotly.express as px
from fastapi.responses import HTMLResponse

@app.get("/interactive_risk_chart", response_class=HTMLResponse)
async def interactive_risk_chart():
    try:
        risk_scores = {
            "Liability": 11,
            "Termination": 12,
            "Indemnification": 10,
            "Payment Risk": 41,
            "Insurance": 71
        }
        df = pd.DataFrame({
            "Risk Category": list(risk_scores.keys()),
            "Risk Score": list(risk_scores.values())
        })
        fig = px.bar(df, x="Risk Category", y="Risk Score", title="Interactive Legal Risk Assessment")
        return fig.to_html()
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error generating interactive chart: {str(e)}")

# ✅ Run FastAPI with proper Colab configuration
def run():
    """Starts the FastAPI server."""
    print("Starting FastAPI server...")
    uvicorn.run(app, host="0.0.0.0", port=8500, timeout_keep_alive=600)

if __name__ == "__main__":
    public_url = setup_ngrok()
    if public_url:
        print(f"\n✅ Your API is publicly available at: {public_url}/docs\n")
    else:
        print("\n⚠️ Ngrok setup failed. API will only be available locally.\n")

    run()


Overwriting app.py


In [17]:
!fuser -k 8500/tcp

In [None]:
!python app.py

error: XDG_RUNTIME_DIR not set in the environment.
ALSA lib confmisc.c:855:(parse_card) cannot find card '0'
ALSA lib conf.c:5178:(_snd_config_evaluate) function snd_func_card_inum returned error: No such file or directory
ALSA lib confmisc.c:422:(snd_func_concat) error evaluating strings
ALSA lib conf.c:5178:(_snd_config_evaluate) function snd_func_concat returned error: No such file or directory
ALSA lib confmisc.c:1334:(snd_func_refer) error evaluating name
ALSA lib conf.c:5178:(_snd_config_evaluate) function snd_func_refer returned error: No such file or directory
ALSA lib conf.c:5701:(snd_config_expand) Evaluate error: No such file or directory
ALSA lib pcm.c:2664:(snd_pcm_open_noupdate) Unknown PCM default
ALSA lib confmisc.c:855:(parse_card) cannot find card '0'
ALSA lib conf.c:5178:(_snd_config_evaluate) function snd_func_card_inum returned error: No such file or directory
ALSA lib confmisc.c:422:(snd_func_concat) error evaluating strings
ALSA lib conf.c:5178:(_snd_config_evalu