In [None]:
# Installing required packages
!pip install transformers torch accelerate bitsandbytes
!pip install sentence-transformers faiss-cpu
!pip install gradio PyPDF2 pypdf
!pip install langchain langchain-community
!pip install --upgrade huggingface_hub

Collecting bitsandbytes
  Downloading bitsandbytes-0.46.1-py3-none-manylinux_2_24_x86_64.whl.metadata (10 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.me

In [None]:
from huggingface_hub import login

# Replace hf_token your actual Hugging Face token
login("hf_token")


In [None]:
import os
import json
import re
from typing import Dict, List, Tuple, Any
import warnings
warnings.filterwarnings('ignore')

import torch
from transformers import (
    AutoTokenizer, AutoModelForCausalLM, 
    pipeline, BitsAndBytesConfig
)
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import PyPDF2
import gradio as gr
from dataclasses import dataclass
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class AnalysisResult:
    summary: str
    data_collected: List[str]
    user_rights: List[str]
    third_party_sharing: str
    safety_rating: int
    recommendation: str

class PrivacyPolicyAnalyzer:
    def __init__(self):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        logger.info(f"Using device: {self.device}")
        
        self.setup_models()
        
        #RAG components
        self.embeddings_model = None
        self.vector_store = None
        self.documents = []
        self.current_policy_text = ""
        
    def setup_models(self):
        logger.info("Setting up Llama 3.1 8B Instruct...")
        
        model_name = "meta-llama/Meta-Llama-3.1-8B-Instruct"
        
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.bfloat16
        )
        
        try:
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            
            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token
                
            self.model = AutoModelForCausalLM.from_pretrained(
                model_name,
                quantization_config=bnb_config,
                device_map="auto",
                torch_dtype=torch.bfloat16,
                trust_remote_code=True
            )
            
            self.text_generator = pipeline(
                "text-generation",
                model=self.model,
                tokenizer=self.tokenizer,
                max_new_tokens=1024,
                temperature=0.1,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id
            )
            
            logger.info("Llama 3.1 8B Instruct loaded successfully!")
            
        except Exception as e:
            logger.error(f"Error loading Llama model: {e}")
            raise Exception(f"Failed to load Llama 3.1 8B Instruct: {e}")
    
    def setup_embeddings(self):
        if self.embeddings_model is None:
            logger.info("Loading embeddings model...")
            self.embeddings_model = SentenceTransformer('all-MiniLM-L6-v2')
            logger.info("Embeddings model loaded!")
    
    def extract_text_from_pdf(self, pdf_file) -> str:
        try:
            reader = PyPDF2.PdfReader(pdf_file)
            text = ""
            for page in reader.pages:
                text += page.extract_text() + "\n"
            return text.strip()
        except Exception as e:
            logger.error(f"Error extracting PDF text: {e}")
            return ""
    
    def preprocess_text(self, text: str) -> str:
        text = re.sub(r'\s+', ' ', text)
        text = text.replace('\n\n', '\n')
        return text.strip()
    
    def chunk_text(self, text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]:
        chunks = []
        start = 0
        
        while start < len(text):
            end = start + chunk_size
            if end > len(text):
                end = len(text)
            
            chunk = text[start:end]
            chunks.append(chunk)
            
            if end == len(text):
                break
                
            start = end - overlap
        
        return chunks
    
    def generate_structured_analysis(self, policy_text: str) -> AnalysisResult:
        analysis_results = {}

        summary_prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are an expert privacy analyst. Provide clear, accurate analysis of privacy policies to help users understand their data rights and privacy implications. Always provide direct responses without prefacing phrases like "Here is" or "Based on".<|eot_id|>

<|start_header_id|>user<|end_header_id|>
Analyze this privacy policy and provide a comprehensive 3-4 sentence summary that covers:
1. What the service does and why it collects data
2. The main types of data collected
3. Key privacy practices (good or concerning)
4. Overall user impact

Privacy Policy Text:
{policy_text[:4000]}

Provide the summary directly in clear, accessible language that a non-technical user can understand. Do not start with "Here is" or similar phrases.<|eot_id|>

<|start_header_id|>assistant<|end_header_id|>"""
        
        analysis_results['summary'] = self._generate_response(summary_prompt)
        
        data_prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are an expert privacy analyst. Extract specific data types mentioned in privacy policies with accuracy and completeness. Provide responses as clean bullet points without introductory phrases.<|eot_id|>

<|start_header_id|>user<|end_header_id|>
From this privacy policy, identify and list ALL specific types of personal data that are collected. Be comprehensive and specific.

Look for categories like:
- Identity information (name, email, phone, etc.)
- Technical data (IP address, device info, cookies, etc.)
- Usage data (app usage, website visits, etc.)
- Location data, Biometric data, Financial information
- Communication content, Social media data
- Any other personal information mentioned

Privacy Policy Text:
{policy_text[:4000]}

List each data type as a separate bullet point. Be specific (e.g., "Email addresses" not just "contact info"). Start directly with the list, no introductory text.<|eot_id|>

<|start_header_id|>assistant<|end_header_id|>"""
        
        data_response = self._generate_response(data_prompt)
        analysis_results['data_collected'] = self._parse_list_response(data_response)
        
        rights_prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are an expert privacy analyst. Identify user rights and controls mentioned in privacy policies accurately. Provide responses as clean bullet points without introductory phrases.<|eot_id|>

<|start_header_id|>user<|end_header_id|>
From this privacy policy, identify ALL user rights and controls mentioned. Look for:

- Data access rights (view, download data)
- Data deletion/erasure rights
- Data correction/modification rights
- Opt-out rights (marketing, tracking, etc.)
- Data portability rights, Consent withdrawal rights
- Privacy settings/controls, Communication preferences
- Account deactivation/deletion
- Any other user controls or rights

Privacy Policy Text:
{policy_text[:4000]}

List each right or control as a separate bullet point. Be specific about what users can actually do. Start directly with the list, no introductory text.<|eot_id|>

<|start_header_id|>assistant<|end_header_id|>"""
        
        rights_response = self._generate_response(rights_prompt)
        analysis_results['user_rights'] = self._parse_list_response(rights_response)
        
        sharing_prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are an expert privacy analyst. Assess third-party data sharing practices accurately and fairly. Provide direct responses without introductory phrases.<|eot_id|>

<|start_header_id|>user<|end_header_id|>
Analyze this privacy policy's third-party data sharing practices. Consider:

1. Types of third parties (advertisers, partners, service providers, etc.)
2. What data is shared and purposes for sharing
3. User control over sharing
4. Sale vs. sharing vs. processing

Based on your analysis, categorize the sharing level as ONE of these options:
- "Extensive sharing" - Data widely shared with many third parties for various purposes including advertising/marketing
- "Moderate sharing" - Some data shared with select partners, mainly for service provision
- "Limited sharing" - Minimal sharing, only with essential service providers
- "No third-party sharing" - No data shared with third parties
- "Unclear" - Policy is vague or contradictory about sharing

Privacy Policy Text:
{policy_text[:4000]}

Provide the category that best fits, followed by a brief 1-sentence explanation. Do not start with introductory phrases.<|eot_id|>

<|start_header_id|>assistant<|end_header_id|>"""
        
        analysis_results['third_party_sharing'] = self._generate_response(sharing_prompt)
        
        safety_rating, recommendation = self._generate_ai_rating(analysis_results, policy_text[:2000])
        
        return AnalysisResult(
            summary=analysis_results['summary'],
            data_collected=analysis_results['data_collected'],
            user_rights=analysis_results['user_rights'],
            third_party_sharing=analysis_results['third_party_sharing'],
            safety_rating=safety_rating,
            recommendation=recommendation
        )
    
    def _generate_response(self, prompt: str) -> str:
   
        try:
            response = self.text_generator(
                prompt,
                max_new_tokens=800,
                temperature=0.1,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id,
                eos_token_id=self.tokenizer.convert_tokens_to_ids("<|eot_id|>")
            )
            
            generated_text = response[0]['generated_text']
            
            
            if "<|start_header_id|>assistant<|end_header_id|>" in generated_text:
                result = generated_text.split("<|start_header_id|>assistant<|end_header_id|>")[-1]
                result = result.split("<|eot_id|>")[0].strip()
            else:
                result = generated_text.replace(prompt, "").strip()
            
            
            result = self._clean_ai_response(result)
            
            return result
            
        except Exception as e:
            logger.error(f"Error generating response: {e}")
            return "Analysis failed due to model error"
    
    def _clean_ai_response(self, response: str) -> str:

        prefixes_to_remove = [
            r"Here is a?\s*(comprehensive\s*)?(\d+-?\d*\s*)?(sentence\s*)?summary[:\s]*",
            r"Here are the specific types of data[:\s]*",
            r"Here are the user rights[:\s]*",
            r"Here are all user rights[:\s]*",
            r"Based on the policy[,:\s]*",
            r"From the privacy policy[,:\s]*",
            r"The following data types are collected[:\s]*",
            r"The policy mentions the following rights[:\s]*",
            r"Here's what I found[:\s]*",
            r"Based on my analysis[,:\s]*",
            r"According to the policy[,:\s]*"
        ]
        
        for prefix in prefixes_to_remove:
            response = re.sub(prefix, '', response, flags=re.IGNORECASE)

        response = response.strip()

        return response
    
    def _parse_list_response(self, response: str) -> List[str]:

        response = self._clean_ai_response(response)
        
        items = []
        lines = response.split('\n')
        
        for line in lines:
            line = line.strip()
            if line and len(line) > 3:
                
                line = re.sub(r'^[-*•·]\s*', '', line)
                line = re.sub(r'^\d+\.\s*', '', line)

                skip_phrases = [
                    'here are', 'the following', 'based on', 'from the policy', 
                    'include:', 'the policy mentions', 'according to', 'as stated'
                ]
                
                if not any(skip_phrase in line.lower() for skip_phrase in skip_phrases):
                    if len(line) > 5 and not line.endswith(':'):  
                        items.append(line)
        
        return items[:15] 
    
    def _generate_ai_rating(self, analysis: Dict, policy_excerpt: str) -> Tuple[int, str]:
    
        
        rating_prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are an expert privacy analyst. Provide fair, balanced privacy ratings based on actual policy content, not assumptions.<|eot_id|>

<|start_header_id|>user<|end_header_id|>
Based on this privacy policy analysis, provide a safety rating from 1-10 (10 being most privacy-friendly) and a recommendation.

Analysis Summary:
- Data collected: {len(analysis.get('data_collected', []))} types identified
- User rights: {len(analysis.get('user_rights', []))} rights/controls identified  
- Third-party sharing: {analysis.get('third_party_sharing', 'Unknown')}

Key Policy Excerpt:
{policy_excerpt}

Consider these factors fairly:
- Data minimization (collecting only necessary data)
- User control and rights
- Transparency of practices
- Third-party sharing extent
- Data security mentions
- Compliance with privacy laws

Respond with:
RATING: [number 1-10]
RECOMMENDATION: [Choose ONE: SAFE, MODERATE, CAUTION, or RISKY] - [Brief explanation in 1-2 sentences]

Be balanced - don't penalize standard business practices, but highlight genuine privacy concerns.<|eot_id|>

<|start_header_id|>assistant<|end_header_id|>"""
        
        rating_response = self._generate_response(rating_prompt)
        
       
        try:
            lines = rating_response.split('\n')
            rating = 5  
            recommendation = "⚠️ MODERATE: Unable to determine privacy safety level from analysis."
            
            for line in lines:
                if line.strip().startswith('RATING:'):
                    rating_text = line.replace('RATING:', '').strip()
                    
                    import re
                    numbers = re.findall(r'\d+', rating_text)
                    if numbers:
                        rating = max(1, min(10, int(numbers[0])))
                
                elif line.strip().startswith('RECOMMENDATION:'):
                    rec_text = line.replace('RECOMMENDATION:', '').strip()
                    
            
                    if 'SAFE' in rec_text.upper():
                        recommendation = f"✅ {rec_text}"
                    elif 'MODERATE' in rec_text.upper():
                        recommendation = f"⚠️ {rec_text}"
                    elif 'CAUTION' in rec_text.upper():
                        recommendation = f"🔸 {rec_text}"
                    elif 'RISKY' in rec_text.upper():
                        recommendation = f"❌ {rec_text}"
                    else:
                        recommendation = f"📋 {rec_text}"
            
            return rating, recommendation
            
        except Exception as e:
            logger.error(f"Error parsing AI rating: {e}")
            return 5, "⚠️ MODERATE: Unable to generate automated privacy assessment. Please review the detailed analysis above."
    
    def setup_rag(self, policy_text: str):
        """Setup RAG system for the current policy"""
        self.setup_embeddings()
        self.current_policy_text = policy_text
        
        # Chunk the document
        chunks = self.chunk_text(policy_text, chunk_size=800, overlap=100)
        self.documents = chunks
        
        # Create embeddings
        logger.info("Creating embeddings for RAG...")
        embeddings = self.embeddings_model.encode(chunks)
        
        # Create FAISS index
        dimension = embeddings.shape[1]
        self.vector_store = faiss.IndexFlatIP(dimension)  
        
        # Normalize embeddings for cosine similarity
        faiss.normalize_L2(embeddings.astype(np.float32))
        self.vector_store.add(embeddings.astype(np.float32))
        
        logger.info(f"RAG setup complete with {len(chunks)} chunks")
    
    def answer_question(self, question: str) -> str:
        """Answer questions about the policy using RAG"""
        if not self.vector_store or not self.current_policy_text:
            return "Please upload and analyze a privacy policy first."
        
        # Get query embedding
        query_embedding = self.embeddings_model.encode([question])
        faiss.normalize_L2(query_embedding.astype(np.float32))
        
        
        k = 3  # Top 3 relevant chunks
        scores, indices = self.vector_store.search(query_embedding.astype(np.float32), k)
        
        
        context = ""
        for idx in indices[0]:
            if idx < len(self.documents):
                context += self.documents[idx] + "\n\n"
        
    
        prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a helpful privacy expert. Answer user questions about privacy policies based on the provided context. Be accurate, clear, and cite specific policy language when possible.<|eot_id|>

<|start_header_id|>user<|end_header_id|>
Based on the following privacy policy context, please answer this question accurately:

Context from Privacy Policy:
{context[:3000]}

Question: {question}

Provide a direct, helpful answer based on the policy context. If the specific information isn't available in the context, say so clearly. Quote relevant parts of the policy when helpful.<|eot_id|>

<|start_header_id|>assistant<|end_header_id|>"""
        
        answer = self._generate_response(prompt)
        return answer
    
    def analyze_policy(self, file_input, text_input) -> Tuple[str, AnalysisResult]:
        
        try:
            
            if file_input is not None:
                if file_input.name.lower().endswith('.pdf'):
                    policy_text = self.extract_text_from_pdf(file_input)
                else:
                    
                    policy_text = file_input.read().decode('utf-8')
            elif text_input:
                policy_text = text_input
            else:
                return "Please provide either a file or paste text.", None
            
            if not policy_text or len(policy_text) < 100:
                return "The provided text is too short or empty. Please provide a complete privacy policy.", None
            
            
            policy_text = self.preprocess_text(policy_text)
            
            
            self.setup_rag(policy_text)
            
            
            logger.info("Starting policy analysis with Llama 3.1 8B...")
            result = self.generate_structured_analysis(policy_text)
            
            return "Analysis completed successfully!", result
            
        except Exception as e:
            logger.error(f"Analysis error: {e}")
            return f"Analysis failed: {str(e)}", None

def format_analysis_output(result: AnalysisResult) -> str:
    output = f"""
# 🔍 Privacy Policy Analysis Report

## 📋 Summary
{result.summary}

## 📊 Data Collected
"""
    if result.data_collected:
        for item in result.data_collected:
            output += f"• {item}\n"
    else:
        output += "• No specific data types identified\n"
    
    output += f"""
## 🛡️ Your Rights
"""
    if result.user_rights:
        for right in result.user_rights:
            output += f"• {right}\n"
    else:
        output += "• No specific user rights identified\n"
    
    output += f"""
## 🤝 Third-Party Data Sharing
{result.third_party_sharing}

## 🎯 Privacy Safety Rating: {result.safety_rating}/10

## 💡 Recommendation
{result.recommendation}
"""
    
    return output

analyzer = PrivacyPolicyAnalyzer()

def analyze_privacy_policy(file_input, text_input):
    
    status, result = analyzer.analyze_policy(file_input, text_input)
    
    if result is None:
        return status, "", ""
    
    formatted_output = format_analysis_output(result)
    return status, formatted_output, "Analysis complete! You can now ask questions about this privacy policy below."

def ask_question(question):
    
    if not question.strip():
        return "Please enter a question."
    
    answer = analyzer.answer_question(question)
    return answer

#Gradio interface
with gr.Blocks(title="Privacy Policy Analyzer", theme=gr.themes.Default()) as demo:
    gr.Markdown("""
    # 🔒 Privacy Policy Analyzer
    ### Powered by Llama 3.1 8B Instruct
    
    Upload a privacy policy (PDF or text) or paste it directly to get a comprehensive AI analysis including:
    - **Intelligent Summary** of key privacy practices
    - **Comprehensive Data Collection** analysis  
    - **User Rights** identification and assessment
    - **Third-party Sharing** evaluation
    - **Privacy Safety Rating** with recommendation
    - **Interactive Q&A** for deeper policy understanding
    """)
    
    with gr.Tab("📊 Policy Analysis"):
        with gr.Row():
            with gr.Column():
                file_input = gr.File(
                    label="Upload Privacy Policy (PDF or TXT)",
                    file_types=[".pdf", ".txt"],
                    height=200
                )
                
                text_input = gr.Textbox(
                    label="Or Paste Privacy Policy Text",
                    placeholder="Paste the privacy policy text here...",
                    lines=10,
                    max_lines=15
                )
                
                analyze_btn = gr.Button("🔍 Analyze", variant="primary", size="lg")
            
            with gr.Column():
                status_output = gr.Textbox(
                    label="Status",
                    interactive=False,
                    max_lines=2
                )
                
                analysis_output = gr.Markdown(
                    label="Analysis Report",
                    value="Upload a privacy policy to see the analysis here."
                )
    
    with gr.Tab("❓ Ask Questions"):
        gr.Markdown("Ask specific questions about the privacy policy you've analyzed. The AI will search through the policy and provide accurate answers.")
        
        question_input = gr.Textbox(
            label="Your Question",
            placeholder="e.g., 'Can I delete my data?' or 'Is my data sold to advertisers?'",
            lines=2
        )
        
        ask_btn = gr.Button("💬 Ask", variant="secondary")
        
        answer_output = gr.Textbox(
            label="Answer",
            lines=5,
            interactive=False
        )
        
        rag_status = gr.Textbox(
            label="RAG Status",
            value="Please analyze a privacy policy first in the Analysis tab.",
            interactive=False,
            max_lines=2
        )
    
    # Connect the interface
    analyze_btn.click(
        fn=analyze_privacy_policy,
        inputs=[file_input, text_input],
        outputs=[status_output, analysis_output, rag_status]
    )
    
    ask_btn.click(
        fn=ask_question,
        inputs=[question_input],
        outputs=[answer_output]
    )
    
    # Example questions
    gr.Examples(
        examples=[
            ["What personal data do they collect?"],
            ["Can I delete my account and data?"],
            ["Do they share my data with third parties?"],
            ["How long do they keep my data?"],
            ["Can I opt out of data collection?"],
            ["Do they use cookies and tracking?"],
            ["What happens if I don't agree to the policy?"],
            ["Do they comply with GDPR or CCPA?"]
        ],
        inputs=[question_input]
    )

# Launch the interface
if __name__ == "__main__":
    demo.launch(
        server_name="0.0.0.0", 
        server_port=7860,
        share=True, 
        debug=True
    )

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Device set to use cuda:0


* Running on local URL:  http://0.0.0.0:7860
* Running on public URL: https://59ce32d1c89293c8f7.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Batches:   0%|          | 0/2 [00:00<?, ?it/s]