In [2]:
# ============================================================================
# COMPLETE SETUP SCRIPT FOR LEGAL QA SYSTEM
# Run this in Google Colab or your Python environment
# ============================================================================

# STEP 1: Install all required packages
print("🔧 Installing required packages...")

# Install command for Google Colab
!pip install streamlit pyngrok pandas scikit-learn joblib opencv-python pytesseract python-docx pdfplumber pillow openpyxl xlrd

# Install tesseract for OCR (Colab only)
!apt install -y tesseract-ocr

print("✅ All packages installed successfully!")

# STEP 2: Import and test all modules
print("\n🧪 Testing imports...")

try:
    import streamlit as st
    import pandas as pd
    import pickle
    import os
    import tempfile
    import pdfplumber
    import io
    import logging
    from typing import Optional, Tuple, Any, Dict, List
    import re
    import json
    from datetime import datetime
    import numpy as np
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.linear_model import LogisticRegression
    from PIL import Image
    print("✅ All imports successful!")
except ImportError as e:
    print(f"❌ Import error: {e}")

# STEP 3: Create the complete legal QA app
print("\n📝 Creating legal_qa_app.py...")

app_code = '''
import streamlit as st
import pandas as pd
import pickle
import os
import tempfile
import pdfplumber
import io
import logging
from typing import Optional, Tuple, Any, Dict, List
import re
import json
from datetime import datetime
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Page config
st.set_page_config(
    page_title="⚖️ CUAD Legal Document QA System",
    page_icon="⚖️",
    layout="wide"
)

class LegalDocumentProcessor:
    """Main processor for legal documents with comprehensive error handling"""

    def __init__(self):
        self.supported_formats = ['pdf', 'txt', 'json', 'xlsx', 'csv']
        self.model = None
        self.vectorizer = None
        self.setup_model()

    def setup_model(self):
        """Initialize or create a simple risk assessment model"""
        try:
            # Create a simple model on the fly
            self.create_simple_model()
            st.success("✅ Risk assessment model initialized")

        except Exception as e:
            logger.error(f"Model setup error: {e}")
            st.error(f"❌ Model setup failed: {e}")

    def create_simple_model(self):
        """Create a simple risk assessment model"""
        # Sample training data for legal risk assessment
        training_data = [
            ("The agreement may be terminated at any time without notice", "High"),
            ("Party shall indemnify and hold harmless against all claims", "High"),
            ("Limitation of liability shall not exceed the contract value", "Medium"),
            ("This agreement shall be governed by the laws of", "Low"),
            ("Confidential information must be protected at all times", "Medium"),
            ("Either party may terminate with 30 days written notice", "Medium"),
            ("The effective date of this agreement is", "Low"),
            ("Penalty for breach shall include liquidated damages", "High"),
            ("No warranty is provided for the services rendered", "High"),
            ("Standard business hours are defined as 9 AM to 5 PM", "Low"),
            ("Exclusive jurisdiction lies with the courts of", "Medium"),
            ("Force majeure events include natural disasters", "Low"),
            ("Intellectual property rights remain with the creator", "Medium"),
            ("Unlimited liability for gross negligence or willful misconduct", "High"),
            ("Contract automatically renews unless terminated", "Medium"),
            ("Termination without cause", "High"),
            ("License agreement expires", "Medium"),
            ("Data processing agreement", "Medium"),
            ("Warranty disclaimer", "High"),
            ("Governing law provisions", "Low")
        ]

        texts, labels = zip(*training_data)

        # Create and train model
        self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
        X = self.vectorizer.fit_transform(texts)

        self.model = LogisticRegression(random_state=42, max_iter=1000)
        self.model.fit(X, labels)

    def process_file(self, uploaded_file) -> Dict[str, Any]:
        """Process uploaded file and return structured results"""
        try:
            filename = uploaded_file.name
            file_extension = filename.split('.')[-1].lower()

            if file_extension not in self.supported_formats:
                return {
                    'success': False,
                    'error': f"Unsupported file format: {file_extension}",
                    'supported_formats': self.supported_formats
                }

            # Extract content based on file type
            content = self.extract_content(uploaded_file, file_extension)

            if not content:
                return {
                    'success': False,
                    'error': "Could not extract content from file"
                }

            # Perform analysis
            analysis_results = self.analyze_content(content, filename)

            return {
                'success': True,
                'filename': filename,
                'file_type': file_extension,
                'content_preview': content[:500] + "..." if len(content) > 500 else content,
                'analysis': analysis_results,
                'processed_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }

        except Exception as e:
            logger.error(f"File processing error: {e}")
            return {
                'success': False,
                'error': f"Processing failed: {str(e)}"
            }

    def extract_content(self, uploaded_file, file_extension: str) -> str:
        """Extract text content from various file formats"""
        try:
            if file_extension == 'pdf':
                return self.extract_pdf_content(uploaded_file)
            elif file_extension == 'txt':
                return self.extract_txt_content(uploaded_file)
            elif file_extension == 'json':
                return self.extract_json_content(uploaded_file)
            elif file_extension in ['xlsx', 'csv']:
                return self.extract_spreadsheet_content(uploaded_file, file_extension)
            else:
                return ""
        except Exception as e:
            logger.error(f"Content extraction error: {e}")
            raise Exception(f"Content extraction failed: {e}")

    def extract_pdf_content(self, uploaded_file) -> str:
        """Extract text from PDF"""
        try:
            with pdfplumber.open(io.BytesIO(uploaded_file.read())) as pdf:
                text_parts = []
                for page in pdf.pages:
                    page_text = page.extract_text()
                    if page_text:
                        text_parts.append(page_text.strip())
                return "\\n\\n".join(text_parts)
        except Exception as e:
            raise Exception(f"PDF extraction failed: {e}")

    def extract_txt_content(self, uploaded_file) -> str:
        """Extract text from TXT file"""
        try:
            return uploaded_file.read().decode('utf-8')
        except UnicodeDecodeError:
            try:
                uploaded_file.seek(0)
                return uploaded_file.read().decode('latin-1')
            except Exception as e:
                raise Exception(f"Text extraction failed: {e}")

    def extract_json_content(self, uploaded_file) -> str:
        """Extract content from JSON file"""
        try:
            json_data = json.load(uploaded_file)
            # Convert JSON to readable text
            return json.dumps(json_data, indent=2)
        except Exception as e:
            raise Exception(f"JSON extraction failed: {e}")

    def extract_spreadsheet_content(self, uploaded_file, file_extension: str) -> str:
        """Extract content from spreadsheet files"""
        try:
            if file_extension == 'xlsx':
                df = pd.read_excel(uploaded_file)
            else:  # csv
                df = pd.read_csv(uploaded_file)

            # Convert dataframe to text representation
            text_parts = []
            text_parts.append(f"Spreadsheet Analysis - {len(df)} rows and {len(df.columns)} columns")
            text_parts.append("\\nColumn Headers: " + ", ".join(df.columns.tolist()))

            # Add sample data for analysis
            if len(df) > 0:
                text_parts.append("\\nSample Data:")
                text_parts.append(df.head(3).to_string())

                # Create text representation for legal analysis
                for col in df.columns:
                    if df[col].dtype == 'object':  # Text columns
                        sample_values = df[col].dropna().head(10).tolist()
                        text_parts.append(f"\\n{col}: " + " | ".join(str(v) for v in sample_values))

            return "\\n".join(text_parts)
        except Exception as e:
            raise Exception(f"Spreadsheet extraction failed: {e}")

    def analyze_content(self, content: str, filename: str) -> Dict[str, Any]:
        """Perform comprehensive analysis on extracted content"""
        try:
            # Basic content analysis
            word_count = len(content.split())
            char_count = len(content)

            # Extract potential clauses (sentences)
            sentences = re.split(r'[.!?]+', content)
            clauses = [s.strip() for s in sentences if len(s.strip()) > 20]

            # Risk analysis using ML model
            risk_analysis = self.perform_risk_analysis(clauses) if self.model and self.vectorizer else None

            # Key term extraction
            key_terms = self.extract_key_terms(content)

            # Document structure analysis
            structure_analysis = self.analyze_structure(content)

            return {
                'basic_stats': {
                    'word_count': word_count,
                    'character_count': char_count,
                    'clause_count': len(clauses)
                },
                'risk_analysis': risk_analysis,
                'key_terms': key_terms,
                'structure': structure_analysis,
                'clauses_sample': clauses[:5]  # First 5 clauses for preview
            }

        except Exception as e:
            logger.error(f"Content analysis error: {e}")
            return {'error': f"Analysis failed: {str(e)}"}

    def perform_risk_analysis(self, clauses: List[str]) -> Dict[str, Any]:
        """Perform ML-based risk analysis on clauses"""
        try:
            if not clauses or len(clauses) == 0:
                return {'error': 'No clauses found for analysis'}

            # Filter out very short clauses
            valid_clauses = [c for c in clauses if len(c.strip()) > 10]
            if not valid_clauses:
                return {'error': 'No valid clauses found for analysis'}

            # Vectorize clauses
            X = self.vectorizer.transform(valid_clauses)

            # Predict risk levels
            risk_predictions = self.model.predict(X)
            risk_probabilities = self.model.predict_proba(X)

            # Analyze results
            risk_summary = {
                'High': sum(1 for r in risk_predictions if r == 'High'),
                'Medium': sum(1 for r in risk_predictions if r == 'Medium'),
                'Low': sum(1 for r in risk_predictions if r == 'Low')
            }

            # Get high-risk clauses
            high_risk_clauses = []
            for i, (clause, risk, probs) in enumerate(zip(valid_clauses, risk_predictions, risk_probabilities)):
                if risk == 'High':
                    high_risk_clauses.append({
                        'clause': clause[:200] + "..." if len(clause) > 200 else clause,
                        'risk_level': risk,
                        'confidence': max(probs)
                    })

            return {
                'summary': risk_summary,
                'high_risk_clauses': high_risk_clauses[:10],  # Top 10 high-risk clauses
                'total_analyzed': len(valid_clauses)
            }

        except Exception as e:
            logger.error(f"Risk analysis error: {e}")
            return {'error': f"Risk analysis failed: {str(e)}"}

    def extract_key_terms(self, content: str) -> List[str]:
        """Extract key legal terms from content"""
        legal_keywords = [
            'agreement', 'contract', 'party', 'clause', 'termination', 'breach',
            'liability', 'damages', 'indemnify', 'warranty', 'confidential',
            'intellectual property', 'jurisdiction', 'governing law', 'force majeure',
            'arbitration', 'penalty', 'liquidated damages', 'material adverse',
            'license', 'copyright', 'trademark', 'patent', 'compliance',
            'audit', 'report', 'renewal', 'expiration', 'fee', 'payment'
        ]

        content_lower = content.lower()
        found_terms = [term for term in legal_keywords if term in content_lower]
        return found_terms

    def analyze_structure(self, content: str) -> Dict[str, Any]:
        """Analyze document structure"""
        lines = content.split('\\n')

        # Count different types of content
        numbered_sections = len([line for line in lines if re.match(r'^\\d+\\.', line.strip())])
        bullet_points = len([line for line in lines if re.match(r'^\\s*[•\\-\\*]', line)])
        all_caps_lines = len([line for line in lines if line.isupper() and len(line.strip()) > 5])

        return {
            'total_lines': len(lines),
            'numbered_sections': numbered_sections,
            'bullet_points': bullet_points,
            'all_caps_lines': all_caps_lines,
            'has_structure': numbered_sections > 0 or bullet_points > 0
        }

def main():
    """Main Streamlit application"""

    # Header
    st.title("📄 CUAD Legal Document QA System")
    st.markdown("*AI-powered legal document analysis and risk assessment*")

    # Initialize processor
    if 'processor' not in st.session_state:
        with st.spinner("🔧 Initializing legal document processor..."):
            st.session_state.processor = LegalDocumentProcessor()

    # File upload section
    st.header("📤 Upload Document")
    uploaded_file = st.file_uploader(
        "Choose a legal document",
        type=['pdf', 'txt', 'json', 'xlsx', 'csv'],
        help="Upload PDF, TXT, JSON, XLSX, or CSV files for analysis"
    )

    if uploaded_file is not None:
        # Display file info
        st.success(f"✅ File '{uploaded_file.name}' uploaded successfully!")

        # File details
        file_size = len(uploaded_file.getvalue())
        st.info(f"📊 File size: {file_size / 1024:.1f} KB")

        # Process button
        if st.button("🔍 Analyze Document", type="primary"):
            with st.spinner("🔄 Processing document..."):
                results = st.session_state.processor.process_file(uploaded_file)

            # Display results
            if results['success']:
                display_analysis_results(results)
            else:
                st.error(f"❌ {results['error']}")
                if 'supported_formats' in results:
                    st.info(f"💡 Supported formats: {', '.join(results['supported_formats'])}")

def display_analysis_results(results: Dict[str, Any]):
    """Display comprehensive analysis results"""

    st.success("✅ Document processed successfully!")

    # Basic information
    col1, col2, col3 = st.columns(3)
    with col1:
        st.metric("File Type", results['file_type'].upper())
    with col2:
        st.metric("Processed At", results['processed_at'].split()[1])
    with col3:
        st.metric("Status", "✅ Complete")

    # Content preview
    with st.expander("📖 Document Preview", expanded=False):
        st.text(results['content_preview'])

    # Analysis results
    analysis = results.get('analysis', {})

    if 'basic_stats' in analysis:
        st.subheader("📊 Document Statistics")
        stats = analysis['basic_stats']

        col1, col2, col3 = st.columns(3)
        with col1:
            st.metric("Word Count", f"{stats['word_count']:,}")
        with col2:
            st.metric("Characters", f"{stats['character_count']:,}")
        with col3:
            st.metric("Clauses Found", stats['clause_count'])

    # Risk analysis
    if 'risk_analysis' in analysis and analysis['risk_analysis']:
        risk_data = analysis['risk_analysis']

        if 'error' not in risk_data:
            st.subheader("⚠️ Risk Assessment")

            # Risk summary
            if 'summary' in risk_data:
                summary = risk_data['summary']
                col1, col2, col3 = st.columns(3)
                with col1:
                    st.metric("🔴 High Risk", summary.get('High', 0))
                with col2:
                    st.metric("🟡 Medium Risk", summary.get('Medium', 0))
                with col3:
                    st.metric("🟢 Low Risk", summary.get('Low', 0))

            # High-risk clauses
            if 'high_risk_clauses' in risk_data and risk_data['high_risk_clauses']:
                st.subheader("🚨 High-Risk Clauses")
                for i, clause_data in enumerate(risk_data['high_risk_clauses']):
                    with st.expander(f"Risk Item #{i+1} (Confidence: {clause_data['confidence']:.2f})"):
                        st.write(clause_data['clause'])
            else:
                st.success("✅ No high-risk clauses detected!")
        else:
            st.warning(f"⚠️ Risk analysis issue: {risk_data['error']}")

    # Key terms
    if 'key_terms' in analysis and analysis['key_terms']:
        st.subheader("🔑 Key Legal Terms Found")
        terms_text = ", ".join(analysis['key_terms'])
        st.write(terms_text)

    # Structure analysis
    if 'structure' in analysis:
        structure = analysis['structure']
        st.subheader("📋 Document Structure")

        col1, col2 = st.columns(2)
        with col1:
            st.metric("Total Lines", structure['total_lines'])
            st.metric("Numbered Sections", structure['numbered_sections'])
        with col2:
            st.metric("Bullet Points", structure['bullet_points'])
            structured = "✅ Well Structured" if structure['has_structure'] else "⚠️ Unstructured"
            st.metric("Structure Quality", structured)

    # Sample clauses
    if 'clauses_sample' in analysis and analysis['clauses_sample']:
        with st.expander("📝 Sample Clauses", expanded=False):
            for i, clause in enumerate(analysis['clauses_sample'], 1):
                st.write(f"**{i}.** {clause}")

if __name__ == "__main__":
    main()
'''

# Write the app to file
with open('legal_qa_app.py', 'w') as f:
    f.write(app_code)

print("✅ legal_qa_app.py created successfully!")

# STEP 4: Setup ngrok (for Colab)
print("\n🌐 Setting up ngrok for public access...")

# Mount Google Drive (if in Colab)
try:
    from google.colab import drive
    drive.mount('/content/drive')
    print("✅ Google Drive mounted")
except:
    print("ℹ️ Not in Colab environment - skipping Drive mount")

print("\n🚀 Setup complete! Now run:")
print("1. !streamlit run legal_qa_app.py &>/content/app.log &")
print("2. Then setup ngrok tunnel")
print("3. Upload your documents and get real analysis results!")

🔧 Installing required packages...
Collecting streamlit
  Downloading streamlit-1.47.1-py3-none-any.whl.metadata (9.0 kB)
Collecting pyngrok
  Downloading pyngrok-7.2.12-py3-none-any.whl.metadata (9.4 kB)
Collecting pytesseract
  Downloading pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Collecting python-docx
  Downloading python_docx-1.2.0-py3-none-any.whl.metadata (2.0 kB)
Collecting pdfplumber
  Downloading pdfplumber-0.11.7-py3-none-any.whl.metadata (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Collecting pdfminer.six==20250506 (from pdfplumb

In [3]:
!streamlit run legal_qa_app.py &>/content/app.log &

In [11]:
from pyngrok import ngrok
import time
import os

# Kill any existing tunnels
ngrok.kill()

# Set your ngrok auth token
os.system("ngrok config add-authtoken 30doYIfaLFxn5sAhPxx2v65UjRO_4y3pgAdUt4kUSmdYkVR4v")

# Wait a moment (not always necessary, but safe)
time.sleep(2)

# Start a new tunnel on port 8501 (default Streamlit port)
public_url = ngrok.connect(8501)
print("🔗 Your Streamlit app is live at:", public_url)


🔗 Your Streamlit app is live at: NgrokTunnel: "https://be1fa69211a6.ngrok-free.app" -> "http://localhost:8501"
