In [None]:
import requests
from bs4 import BeautifulSoup
import mysql.connector
import json
import time
import re
from typing import List, Dict, Optional
from dataclasses import dataclass
import logging
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class Answer:
    id: str
    text: str
    is_correct: bool = False

@dataclass
class Question:
    id: str
    chapter: Optional[str]
    test_number: str
    test_type: str  # 'chapter' or 'comprehensive' or 'exam'
    question_text: str
    question_type: str  # 'radio' or 'checkbox'
    answers: List[Answer]
    explanation: str
    correct_answers: List[str]  # List of correct answer IDs

class UKVisaTestCrawler:
    def __init__(self, db_config: Optional[Dict] = None):
        self.base_url = "https://lifeintheuktestweb.co.uk"
        self.session = self._create_session()
        self.db_config = db_config
        self.questions_data = []
        
        # Test URLs organized by type
        self.test_configs = {
            # Chapter-based tests
            "chapter_tests": {
                "chapter_1": [
                    "test-1-2"  # Chapters 1 & 2 combined test
                ],
                "chapter_2": [
                    "test-1-2"  # Same test, but we'll mark it for both chapters
                ],
                "chapter_3": [
                    f"test-3-{i}" for i in range(1, 11)
                ],
                "chapter_4": [
                    f"test-4-{i}" for i in range(1, 13)
                ],
                "chapter_5": [
                    f"test-5-{i}" for i in range(1, 11)
                ]
            },
            # Comprehensive tests (no specific chapter)
            "comprehensive_tests": [
                f"test-{i}" for i in range(1, 41)  # test-1 to test-40
            ],
            # Exam tests (not implemented yet, placeholder)
            "exam_tests": [
                # Placeholder for future exam tests 
                f"british-citizenship-test-{i}" for i in range(1, 16)
            ]
        }

    def _create_session(self):
        """Create a robust session with retry strategy"""
        session = requests.Session()
        
        # Retry strategy
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        
        # Headers to appear more like a regular browser
        session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        })
        
        return session

    def extract_question_data(self, html_content: str, chapter: Optional[str], test_number: str, test_type: str) -> List[Question]:
        """Extract question data from HTML content"""
        soup = BeautifulSoup(html_content, 'html.parser')
        questions = []
        
        # Find all question containers
        question_containers = soup.find_all('div', class_='container_question')
        
        for container in question_containers:
            try:
                question_id = container.get('data-id_question', '')
                
                # Extract question text
                question_element = container.find('div', class_='question')
                if not question_element:
                    continue
                    
                question_text = question_element.get_text(strip=True)
                
                # Extract answers
                answers = []
                answer_container = container.find('ul', class_='container_answer')
                if not answer_container:
                    continue
                
                answer_items = answer_container.find_all('li')
                question_type = 'radio'  # default
                
                for item in answer_items:
                    input_element = item.find('input')
                    if not input_element:
                        continue
                        
                    answer_id = input_element.get('data-id_answer', '')
                    input_type = input_element.get('type', 'radio')
                    if input_type == 'checkbox':
                        question_type = 'checkbox'
                    
                    # Get answer text (remove the input element)
                    label = item.find('label')
                    if label:
                        # Clone the label and remove input to get clean text
                        label_copy = BeautifulSoup(str(label), 'html.parser').find('label')
                        input_in_label = label_copy.find('input')
                        if input_in_label:
                            input_in_label.decompose()
                        answer_text = label_copy.get_text(strip=True)
                    else:
                        answer_text = item.get_text(strip=True)
                    
                    answers.append(Answer(id=answer_id, text=answer_text))
                
                # Extract explanation and correct answers
                explanation = ""
                correct_answers = []
                explanation_container = container.find('div', class_='container_explication')
                
                if explanation_container:
                    explanation = explanation_container.get_text(strip=True)
                    
                    # Try to identify correct answers from explanation
                    # Look for strong tags or specific patterns
                    strong_elements = explanation_container.find_all('strong')
                    for strong in strong_elements:
                        strong_text = strong.get_text(strip=True)
                        # Match this text with answers
                        for answer in answers:
                            if strong_text.lower() in answer.text.lower() or answer.text.lower() in strong_text.lower():
                                answer.is_correct = True
                                if answer.id not in correct_answers:
                                    correct_answers.append(answer.id)
                    
                    # If no strong tags found, try to parse explanation text
                    if not correct_answers:
                        correct_answers = self._parse_correct_answers_from_explanation(explanation, answers)
                
                question = Question(
                    id=question_id,
                    chapter=chapter,
                    test_number=test_number,
                    test_type=test_type,
                    question_text=question_text,
                    question_type=question_type,
                    answers=answers,
                    explanation=explanation,
                    correct_answers=correct_answers
                )
                
                questions.append(question)
                
            except Exception as e:
                logger.error(f"Error extracting question from container: {e}")
                continue
        
        return questions

    def _parse_correct_answers_from_explanation(self, explanation: str, answers: List[Answer]) -> List[str]:
        """Try to identify correct answers from explanation text"""
        correct_ids = []
        
        # Common patterns in explanations
        patterns = [
            r"correct answer[s]?[:\s]*([^.]+)",
            r"answer[s]?[:\s]*([^.]+)\s+is correct",
            r"([^.]+)\s+is the correct answer",
            r"([^.]+)\s+are the correct answers",
            r"The correct answers? (?:are?|is) ([^.]+)"
        ]
        
        explanation_lower = explanation.lower()
        
        for pattern in patterns:
            matches = re.findall(pattern, explanation_lower, re.IGNORECASE)
            for match in matches:
                for answer in answers:
                    # More flexible matching
                    answer_lower = answer.text.lower()
                    match_lower = match.lower()
                    
                    # Check if answer text is substantially contained in match or vice versa
                    if (len(answer_lower) > 10 and answer_lower in match_lower) or \
                       (len(match_lower) > 10 and match_lower in answer_lower) or \
                       (answer_lower == match_lower):
                        answer.is_correct = True
                        if answer.id not in correct_ids:
                            correct_ids.append(answer.id)
        
        return correct_ids

    def crawl_test(self, test_path: str, chapter: Optional[str], test_number: str, test_type: str, retry_count: int = 3) -> List[Question]:
        """Crawl a single test and return questions with retry mechanism"""
        url = f"{self.base_url}/{test_path}"
        logger.info(f"Crawling: {url} (Chapter: {chapter}, Type: {test_type})")
        
        for attempt in range(retry_count):
            try:
                response = self.session.get(url, timeout=15)
                response.raise_for_status()
                
                questions = self.extract_question_data(response.text, chapter, test_number, test_type)
                logger.info(f"Extracted {len(questions)} questions from {test_path}")
                
                return questions
                
            except requests.exceptions.RequestException as e:
                logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}")
                if attempt < retry_count - 1:
                    wait_time = (attempt + 1) * 2  # Exponential backoff
                    logger.info(f"Waiting {wait_time} seconds before retry...")
                    time.sleep(wait_time)
                else:
                    logger.error(f"Failed to crawl {url} after {retry_count} attempts")
            except Exception as e:
                logger.error(f"Unexpected error crawling {url}: {e}")
                break
        
        return []

    def crawl_all_tests(self):
        """Crawl all tests and collect data"""
        logger.info("Starting to crawl all tests...")
        
        # Crawl chapter-based tests
        logger.info("=== Crawling Chapter-based Tests ===")
        for chapter, test_paths in self.test_configs["chapter_tests"].items():
            logger.info(f"Processing {chapter}")
            
            for test_path in test_paths:
                # Extract test number from path
                test_number = test_path.split('-')[-1]
                
                questions = self.crawl_test(test_path, chapter, test_number, "chapter")
                self.questions_data.extend(questions)
                
                # Be respectful to the server
                time.sleep(2)
        
        # Crawl comprehensive tests
        logger.info("=== Crawling Comprehensive Tests ===")
        for test_path in self.test_configs["comprehensive_tests"]:
            # Extract test number from path
            test_number = test_path.split('-')[-1]
            
            questions = self.crawl_test(test_path, None, test_number, "comprehensive")
            self.questions_data.extend(questions)
            
            # Be respectful to the server
            time.sleep(2)

        # Crawl exam tests (if implemented in the future)
        logger.info("=== Crawling Exam Tests (Placeholder) ===")
        for test_path in self.test_configs["exam_tests"]:
            # Extract test number from path
            test_number = test_path.split('-')[-1]
            
            questions = self.crawl_test(test_path, None, test_number, "exam")
            self.questions_data.extend(questions)
            
            # Be respectful to the server
            time.sleep(2)
        
        logger.info(f"Crawling completed. Total questions: {len(self.questions_data)}")

    def save_to_json(self, filename: str = "uk_visa_all_questions.json"):
        """Save collected data to JSON file"""
        data = {
            "metadata": {
                "total_questions": len(self.questions_data),
                "crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"),
                "source": "lifeintheuktestweb.co.uk",
                "test_types": {
                    "chapter": len([q for q in self.questions_data if q.test_type == "chapter"]),
                    "comprehensive": len([q for q in self.questions_data if q.test_type == "comprehensive"]),
                    "exam": len([q for q in self.questions_data if q.test_type == "exam"])
                }
            },
            "questions": []
        }
        
        for question in self.questions_data:
            question_dict = {
                "id": question.id,
                "chapter": question.chapter,
                "test_number": question.test_number,
                "test_type": question.test_type,
                "question_text": question.question_text,
                "question_type": question.question_type,
                "answers": [
                    {
                        "id": answer.id,
                        "text": answer.text,
                        "is_correct": answer.is_correct
                    }
                    for answer in question.answers
                ],
                "explanation": question.explanation,
                "correct_answers": question.correct_answers
            }
            data["questions"].append(question_dict)
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        
        logger.info(f"Data saved to {filename}")

    def create_database_schema(self):
        """Create MySQL database schema"""
        if not self.db_config:
            logger.error("Database configuration not provided")
            return
        
        connection = mysql.connector.connect(**self.db_config)
        cursor = connection.cursor()
        
        # Create tables
        schema_sql = """
        CREATE DATABASE IF NOT EXISTS uk_visa_test CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
        USE uk_visa_test;
        
        CREATE TABLE IF NOT EXISTS chapters (
            id INT AUTO_INCREMENT PRIMARY KEY,
            chapter_number INT NOT NULL,
            name VARCHAR(100) NOT NULL,
            description TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            UNIQUE KEY unique_chapter_number (chapter_number)
        );
        
        CREATE TABLE IF NOT EXISTS tests (
            id INT AUTO_INCREMENT PRIMARY KEY,
            chapter_id INT NULL,
            test_number VARCHAR(10) NOT NULL,
            test_type ENUM('chapter', 'comprehensive', 'exam') NOT NULL,
            title VARCHAR(255),
            url VARCHAR(255),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (chapter_id) REFERENCES chapters(id) ON DELETE SET NULL,
            INDEX idx_test_type (test_type),
            INDEX idx_test_number (test_number)
        );
        
        CREATE TABLE IF NOT EXISTS questions (
            id INT AUTO_INCREMENT PRIMARY KEY,
            test_id INT NOT NULL,
            question_id VARCHAR(50) NOT NULL,
            question_text TEXT NOT NULL,
            question_type ENUM('radio', 'checkbox') NOT NULL,
            explanation TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (test_id) REFERENCES tests(id) ON DELETE CASCADE,
            INDEX idx_question_id (question_id),
            INDEX idx_question_type (question_type)
        );
        
        CREATE TABLE IF NOT EXISTS answers (
            id INT AUTO_INCREMENT PRIMARY KEY,
            question_id INT NOT NULL,
            answer_id VARCHAR(50) NOT NULL,
            answer_text TEXT NOT NULL,
            is_correct BOOLEAN DEFAULT FALSE,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (question_id) REFERENCES questions(id) ON DELETE CASCADE,
            INDEX idx_answer_id (answer_id),
            INDEX idx_is_correct (is_correct)
        );
        """
        
        # Execute schema creation
        for statement in schema_sql.split(';'):
            if statement.strip():
                cursor.execute(statement)
        
        connection.commit()
        cursor.close()
        connection.close()
        
        logger.info("Database schema created successfully")

    def _insert_chapters(self, cursor):
        """Insert chapter data"""
        chapters_data = [
            (1, "Chapter 1: The Values and Principles of the UK"),
            (2, "Chapter 2: What is the UK?"),
            (3, "Chapter 3: A Long and Illustrious History"),
            (4, "Chapter 4: A Modern, Thriving Society"),
            (5, "Chapter 5: The UK Government, the Law and Your Role")
        ]
        
        chapter_mapping = {}
        for chapter_num, chapter_name in chapters_data:
            cursor.execute(
                "INSERT IGNORE INTO chapters (chapter_number, name) VALUES (%s, %s)",
                (chapter_num, chapter_name)
            )
            cursor.execute(
                "SELECT id FROM chapters WHERE chapter_number = %s", 
                (chapter_num,)
            )
            result = cursor.fetchone()
            if result:
                chapter_mapping[f"chapter_{chapter_num}"] = result[0]
        
        return chapter_mapping

    def save_to_database(self):
        """Save collected data to MySQL database"""
        if not self.db_config:
            logger.error("Database configuration not provided")
            return
        
        connection = mysql.connector.connect(**self.db_config)
        cursor = connection.cursor()
        
        try:
            # Use the database
            cursor.execute("USE uk_visa_test")
            
            # Insert chapters
            chapter_mapping = self._insert_chapters(cursor)
            
            # Insert tests and questions
            test_mapping = {}
            
            for question in self.questions_data:
                # Determine chapter_id
                chapter_id = None
                if question.chapter:
                    chapter_id = chapter_mapping.get(question.chapter)
                
                # Create test key
                test_key = f"{question.test_type}_{question.test_number}_{question.chapter or 'none'}"
                
                if test_key not in test_mapping:
                    # Insert test
                    cursor.execute(
                        "INSERT IGNORE INTO tests (chapter_id, test_number, test_type, url) VALUES (%s, %s, %s, %s)",
                        (chapter_id, question.test_number, question.test_type, f"test-{question.test_number}")
                    )
                    
                    # Get test ID - handle NULL chapter_id properly
                    if chapter_id is None:
                        cursor.execute(
                            "SELECT id FROM tests WHERE chapter_id IS NULL AND test_number = %s AND test_type = %s",
                            (question.test_number, question.test_type)
                        )
                    else:
                        cursor.execute(
                            "SELECT id FROM tests WHERE chapter_id = %s AND test_number = %s AND test_type = %s",
                            (chapter_id, question.test_number, question.test_type)
                        )
                    
                    result = cursor.fetchone()
                    if result:
                        test_mapping[test_key] = result[0]
                    else:
                        # Fallback: get by test_number and type only
                        cursor.execute(
                            "SELECT id FROM tests WHERE test_number = %s AND test_type = %s LIMIT 1",
                            (question.test_number, question.test_type)
                        )
                        result = cursor.fetchone()
                        if result:
                            test_mapping[test_key] = result[0]
                
                if test_key not in test_mapping:
                    logger.warning(f"Could not find or create test for {test_key}")
                    continue
                
                test_id = test_mapping[test_key]
                
                # Insert question
                cursor.execute(
                    "INSERT INTO questions (test_id, question_id, question_text, question_type, explanation) VALUES (%s, %s, %s, %s, %s)",
                    (test_id, question.id, question.question_text, question.question_type, question.explanation)
                )
                
                question_db_id = cursor.lastrowid
                
                # Insert answers
                for answer in question.answers:
                    cursor.execute(
                        "INSERT INTO answers (question_id, answer_id, answer_text, is_correct) VALUES (%s, %s, %s, %s)",
                        (question_db_id, answer.id, answer.text, answer.is_correct)
                    )
            
            connection.commit()
            logger.info("Data saved to database successfully")
            
        except Exception as e:
            logger.error(f"Error saving to database: {e}")
            connection.rollback()
            raise
        finally:
            cursor.close()
            connection.close()

def main():
    # Database configuration (update with your MySQL credentials)
    db_config = {
        'host': 'localhost',
        'port': 3307,  # Default MySQL port
        'user': 'root',
        'password': '',
        'database': 'uk_visa_test',
        'charset': 'utf8mb4'
    }
    
    # Initialize crawler
    crawler = UKVisaTestCrawler(db_config)
    
    # Create database schema
    crawler.create_database_schema()
    
    # Crawl all tests
    crawler.crawl_all_tests()
    
    # Save to JSON file
    crawler.save_to_json()
    
    # Save to database
    crawler.save_to_database()
    
    print(f"Crawling completed! Found {len(crawler.questions_data)} questions.")
    
    # Print summary
    chapter_questions = len([q for q in crawler.questions_data if q.test_type == "chapter"])
    comprehensive_questions = len([q for q in crawler.questions_data if q.test_type == "comprehensive"])
    
    print(f"Chapter-based questions: {chapter_questions}")
    print(f"Comprehensive test questions: {comprehensive_questions}")

if __name__ == "__main__":
    main()

2025-08-02 15:14:10,761 - INFO - Database schema created successfully
2025-08-02 15:14:10,762 - INFO - Starting to crawl all tests...
2025-08-02 15:14:10,763 - INFO - === Crawling Chapter-based Tests ===
2025-08-02 15:14:10,764 - INFO - Processing chapter_1
2025-08-02 15:14:10,764 - INFO - Crawling: https://lifeintheuktestweb.co.uk/test-1-2 (Chapter: chapter_1, Type: chapter)
2025-08-02 15:14:13,268 - INFO - Extracted 24 questions from test-1-2
2025-08-02 15:14:15,273 - INFO - Processing chapter_2
2025-08-02 15:14:15,274 - INFO - Crawling: https://lifeintheuktestweb.co.uk/test-1-2 (Chapter: chapter_2, Type: chapter)
2025-08-02 15:14:16,255 - INFO - Extracted 24 questions from test-1-2
2025-08-02 15:14:18,258 - INFO - Processing chapter_3
2025-08-02 15:14:18,259 - INFO - Crawling: https://lifeintheuktestweb.co.uk/test-3-1 (Chapter: chapter_3, Type: chapter)
2025-08-02 15:14:19,548 - INFO - Extracted 24 questions from test-3-1
2025-08-02 15:14:21,553 - INFO - Crawling: https://lifeintheu

Crawling completed! Found 1776 questions.
Chapter-based questions: 816
Comprehensive test questions: 960
