# **SMU Course Scraping Using Selenium**

<div style="background-color:#FFD700; padding:15px; border-radius:5px; border: 2px solid #FF4500;">
    
  <h1 style="color:#8B0000;">⚠️🚨 SCRAPE THIS DATA AT YOUR OWN RISK 🚨⚠️</h1>
  
  <p><strong>📌 If you need the data, please contact me directly.</strong> Only available for **existing students**.</p>

  <h3>🔗 📩 How to Get the Data?</h3>
  <p>📨 <strong>Reach out to me for access</strong> instead of scraping manually.</p>

</div>

<br>

<div style="background-color:#FFF8DC; padding:12px; border-radius:5px; border: 1px solid #DAA520;">
    
  <h2 style="color:#8B8000;">✨ Looking for the Latest Model? Consider V4! ✨</h2>
  <p>👉 <a href="V4_example_prediction.ipynb"><strong>Check out V4 Here</strong></a></p>

</div>

### **Objective**
This script is designed to scrape SMU course details from the BOSS system using Selenium. The process involves:
1. Logging into the system manually to bypass authentication.
2. Iteratively scraping class details for specified academic years and terms.
3. Writing the scraped data to structured CSV files.

### **Script Structure**
1. **Setup**: Import libraries and initialize Selenium WebDriver.
2. **Login**: Wait for manual login and authentication.
3. **Scraping Logic**:
    - `scrape_class_details`: Scrapes course details for a specific class number, academic year, and term.
    - `main`: Manages the scraping process for multiple academic years and terms.
4. **Execution**: Log in and start scraping.


---

## **1. Setup**

In [45]:
import os
import re
import csv
import time
import pickle
import logging
import pandas as pd
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
from webdriver_manager.chrome import ChromeDriverManager
import psycopg2
from urllib.parse import parse_qs, urlparse
from pathlib import Path
from typing import Dict, List, Optional, Tuple


---

## **2. Login Handling**

In [8]:
def wait_for_manual_login(driver):
    print("Please log in manually and complete the Microsoft Authenticator process.")
    print("Waiting for BOSS dashboard to load...")
    
    # Create a WebDriverWait object with 2-minute timeout
    wait = WebDriverWait(driver, 120)
    
    try:
        # Wait for the username label that appears after successful login
        wait.until(EC.presence_of_element_located((By.ID, "Label_UserName")))
        
        # Verify additional elements to confirm we're fully logged in
        wait.until(EC.presence_of_element_located((By.XPATH, "//a[contains(text(),'Sign out')]")))
        
        # Get the username for confirmation
        username = driver.find_element(By.ID, "Label_UserName").text
        print(f"Login successful! Logged in as {username}")
        
    except TimeoutException:
        print("Login failed or timed out. Could not detect login elements.")
        raise Exception("Login failed")
    
    # Additional small delay to ensure everything is loaded
    time.sleep(1)
    
    return True


---

## **3. Scrape data**

### **3.1 Scrape all data from BOSS**
1. Take all existing AddedInfo files with `SelectedClassNumber` min and max.
2. Scrape entire webpage on BOSS, seperate them by AY and Term
3. Create an overall scraping logic for future. Past AY2024T3B.

In [9]:
def read_class_number_ranges(directory='classTimings'):
    """
    Read existing CSV files to determine min and max class numbers for each AY_TERM.
    
    Args:
        directory: Directory containing CSV files (format: [AY]_[Term]AddedInfo.csv)
    
    Returns:
        Dictionary mapping AY_TERM to min/max class numbers
    """
    class_number_ranges = {}
    
    # Create directory if it doesn't exist
    os.makedirs(directory, exist_ok=True)
    
    # Regex pattern to match files like 2021-22_T1AddedInfo.csv
    pattern = re.compile(r'(\d{4}-\d{2}_T\d[AB]?)AddedInfo\.csv')
    
    try:
        files_found = False
        for filename in os.listdir(directory):
            match = pattern.match(filename)
            if match:
                files_found = True
                # Extract AY_TERM from filename
                ay_term = match.group(1)
                filepath = os.path.join(directory, filename)
                
                min_class = None
                max_class = None
                
                with open(filepath, 'r', encoding='utf-8') as file:
                    reader = csv.DictReader(file)
                    for row in reader:
                        try:
                            class_num = int(row.get('SelectedClassNumber', '').strip())
                            if min_class is None or class_num < min_class:
                                min_class = class_num
                            if max_class is None or class_num > max_class:
                                max_class = class_num
                        except (ValueError, TypeError):
                            continue
                
                if min_class is not None and max_class is not None:
                    class_number_ranges[ay_term] = {'min': min_class, 'max': max_class}
                else:
                    # Default values if no valid class numbers found
                    class_number_ranges[ay_term] = {'min': 1000, 'max': 5000}
                    
        # If no files found, return empty dictionary
        if not files_found:
            print("No class timing files found. Will use default ranges.")
            return {}
            
    except FileNotFoundError:
        print(f"Directory '{directory}' not found. Creating it.")
        os.makedirs(directory, exist_ok=True)
    
    return class_number_ranges

In [10]:
def scrape_and_save_html(driver, start_ay_term='2021-22_T1', end_ay_term='2024-25_T3B', base_dir='classTimingsFull'):
    """
    Scrapes class details from BOSS and saves them as HTML files
    
    Args:
        driver: WebDriver instance that is already logged in
        start_ay_term: Starting academic year and term (e.g., '2021-22_T1')
        end_ay_term: Ending academic year and term (e.g., '2024-25_T3B')
        base_dir: Base directory to save the HTML files
    """
    # Term code mapping for URL parameters
    term_code_map = {'T1': '10', 'T2': '20', 'T3A': '31', 'T3B': '32'}
    
    # Define all possible terms in order
    all_terms = ['T1', 'T2', 'T3A', 'T3B']
    
    # Define all possible academic years
    all_academic_years = ['2021-22', '2022-23', '2023-24', '2024-25', '2025-26', '2026-27']
    
    # Generate all possible AY_TERM combinations
    all_ay_terms = []
    for ay in all_academic_years:
        for term in all_terms:
            all_ay_terms.append(f"{ay}_{term}")
    
    # Find the indices of the start and end terms
    try:
        start_idx = all_ay_terms.index(start_ay_term)
        end_idx = all_ay_terms.index(end_ay_term)
    except ValueError:
        print("Invalid start or end term provided. Using full range.")
        start_idx = 0
        end_idx = len(all_ay_terms) - 1
    
    # Select the range to scrape
    ay_terms_to_scrape = all_ay_terms[start_idx:end_idx+1]
    
    # First, read existing class ranges if available
    class_number_ranges = read_class_number_ranges('classTimings')
    print(f"Found class number ranges: {class_number_ranges}")
    
    # Create base directory if needed
    os.makedirs(base_dir, exist_ok=True)
    
    # Process each AY_TERM
    for ay_term in ay_terms_to_scrape:
        print(f"Processing {ay_term}...")
        
        # Parse AY_TERM for URL
        ay, term = ay_term.split('_')
        ay_short = ay[2:4]  # last two digits of first year
        term_code = term_code_map.get(term, '10')
        
        # Get min/max class numbers or use defaults
        ranges = class_number_ranges.get(ay_term, {'min': 1000, 'max': 5000})
        min_class = ranges.get('min', 1000)
        max_class = ranges.get('max', 5000)
        
        # Create folder for AY_TERM
        folder_path = os.path.join(base_dir, ay_term)
        os.makedirs(folder_path, exist_ok=True)
        
        consecutive_empty = 0
        
        # Scrape each class number in range
        for class_num in range(min_class, max_class + 1):
            url = f"https://boss.intranet.smu.edu.sg/ClassDetails.aspx?SelectedClassNumber={class_num:04}&SelectedAcadTerm={ay_short}{term_code}&SelectedAcadCareer=UGRD"
            
            try:
                driver.get(url)
                
                # Wait for EITHER the success element OR the error element to appear
                wait = WebDriverWait(driver, 15)
                try:
                    # Wait for either the class header OR the error details element
                    element = wait.until(EC.any_of(
                        EC.presence_of_element_located((By.ID, "lblClassInfoHeader")),
                        EC.presence_of_element_located((By.ID, "lblErrorDetails"))
                    ))
                    
                    # Check if "No record found" is in the error details
                    error_elements = driver.find_elements(By.ID, "lblErrorDetails")
                    has_data = True
                    
                    for error in error_elements:
                        if "No record found" in error.text:
                            has_data = False
                            break
                        
                except Exception as e:
                    print(f"Wait error: {e}")
                    has_data = False
                
                if not has_data:
                    consecutive_empty += 1
                    print(f"No record found for {ay_term}, class {class_num:04}. Consecutive empty: {consecutive_empty}")
                    
                    if consecutive_empty >= 100:
                        print(f"100 consecutive empty records reached for {ay_term}, moving on.")
                        break
                    
                    # No need to wait 30 seconds if we already know it's empty
                    time.sleep(2)  # Small pause before next request
                    continue
                
                # Reset consecutive empty counter if data found
                consecutive_empty = 0
                
                # Save HTML file
                filename = f"SelectedAcadTerm={ay_short}{term_code}&SelectedClassNumber={class_num:04}.html"
                filepath = os.path.join(folder_path, filename)
                
                with open(filepath, 'w', encoding='utf-8') as f:
                    f.write(driver.page_source)
                
                print(f"Saved {filepath}")
                
                # Small pause between requests
                time.sleep(2)
                
            except Exception as e:
                print(f"Error processing {url}: {str(e)}")
                time.sleep(5)  # Wait a bit longer after an error
    
    print("Scraping completed.")
    driver.quit()

In [11]:
def generate_scraped_filepaths_csv(base_dir='classTimingsFull', output_csv='scraped_filepaths.csv'):
    """
    Generates a CSV file with paths to all valid HTML files (those without "No record found")
    
    Args:
        base_dir: Base directory where HTML files are stored
        output_csv: Name of the output CSV file
    
    Returns:
        Path to the generated CSV file
    """
    filepaths = []
    
    # Check if base directory exists
    if not os.path.exists(base_dir):
        print(f"Directory '{base_dir}' does not exist.")
        return None
    
    # Walk through directory structure
    for root, dirs, files in os.walk(base_dir):
        for file in files:
            if file.endswith('.html'):
                filepath = os.path.join(root, file)
                try:
                    with open(filepath, 'r', encoding='utf-8') as f:
                        content = f.read()
                        if 'No record found' not in content:
                            filepaths.append(filepath)
                except Exception as e:
                    print(f"Error reading file {filepath}: {str(e)}")
    
    # Write to CSV
    with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Filepath'])
        for path in filepaths:
            writer.writerow([path])
    
    print(f"Generated CSV file with {len(filepaths)} valid file paths at {output_csv}")
    return output_csv

In [12]:
if __name__ == "__main__":
    # Set up WebDriver - REMOVED headless mode to allow manual login
    options = webdriver.ChromeOptions()
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service)
    
    try:
        # Initialize the driver
        driver = webdriver.Chrome(service=service, options=options)
        
        # Step 1: Navigate to login page and wait for manual login
        driver.get("https://boss.intranet.smu.edu.sg/")
        wait_for_manual_login(driver)
        
        # Step 2: Now that we're logged in, proceed with scraping
        # You can optionally run a test scrape first
        # test_scrape_class_details(driver)
        
        # Step 3: Run the main scraping function with the authenticated driver
        scrape_and_save_html(driver, '2025-26_T1', '2025-26_T1', 'classTimingsFull')
        
        # Step 4: Generate CSV with valid file paths
        generate_scraped_filepaths_csv('classTimingsFull', 'scraped_filepaths.csv')
        
    finally:
        # Ensure driver is closed properly
        if driver:
            driver.quit()
        print("Process completed!")

Please log in manually and complete the Microsoft Authenticator process.
Waiting for BOSS dashboard to load...
Login successful! Logged in as Welcome, TAN ZHONG YAN
Found class number ranges: {'2021-22_T1': {'min': 1002, 'max': 2889}, '2021-22_T2': {'min': 1002, 'max': 2957}, '2021-22_T3A': {'min': 1002, 'max': 1038}, '2021-22_T3B': {'min': 1002, 'max': 1033}, '2022-23_T1': {'min': 1002, 'max': 2954}, '2022-23_T2': {'min': 1002, 'max': 2920}, '2022-23_T3A': {'min': 1002, 'max': 1031}, '2022-23_T3B': {'min': 1002, 'max': 1027}, '2023-24_T1': {'min': 1002, 'max': 2982}, '2023-24_T2': {'min': 1002, 'max': 2964}, '2023-24_T3A': {'min': 1002, 'max': 1028}, '2023-24_T3B': {'min': 1003, 'max': 1033}, '2024-25_T1': {'min': 1002, 'max': 2945}, '2024-25_T2': {'min': 1002, 'max': 2786}}
Processing 2025-26_T1...
No record found for 2025-26_T1, class 1000. Consecutive empty: 1
No record found for 2025-26_T1, class 1001. Consecutive empty: 2
Saved classTimingsFull\2025-26_T1\SelectedAcadTerm=2510&Se

### **3.2 Extract needed data from all scraped websites**

### TEST SCRIPT

In [65]:
import csv
import os
import re
import pandas as pd
import psycopg2
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import logging
import random
import importlib.util

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class SchemaConformanceTestRunner:
    def __init__(self):
        self.test_results = {
            'database_connection': False,
            'table_access': {},
            'scraping_success_count': 0,
            'scraping_error_count': 0,
            'total_files_tested': 0,
            'missing_files_count': 0,
            'csv_generation': {},
            'schema_validation': {},
            'errors': []
        }
        
        # Test database configuration for localhost Supabase
        self.test_db_config = {
            'host': 'localhost',
            'database': 'postgres', 
            'user': 'postgres',
            'password': 'changeme',
            'port': 5433
        }
        
        # Expected schema structure based on Prisma schema
        self.expected_schemas = {
            'courses_updates.csv': {
                'required_columns': ['code', 'course_area', 'enrolment_requirements'],
                'table_name': 'courses'
            },
            'classes_updates.csv': {
                'required_columns': ['course_code', 'section', 'acad_term_id', 'grading_basis', 'course_outline_url'],
                'table_name': 'classes'
            },
            'acad_term.csv': {
                'required_columns': ['id', 'acad_year_start', 'acad_year_end', 'term', 'boss_id', 'start_dt', 'end_dt'],
                'table_name': 'acad_term'
            },
            'class_timing.csv': {
                'required_columns': ['class_id', 'start_date', 'end_date', 'day_of_week', 'start_time', 'end_time', 'venue'],
                'table_name': 'class_timing'
            },
            'class_exam_timing.csv': {
                'required_columns': ['class_id', 'date', 'day_of_week', 'start_time', 'end_time', 'venue'],
                'table_name': 'class_exam_timing'
            }
        }

    def test_database_connection(self):
        """Test connection to localhost Supabase database and table access"""
        logger.info("Testing database connection...")
        try:
            connection = psycopg2.connect(
                host=self.test_db_config['host'],
                database=self.test_db_config['database'],
                user=self.test_db_config['user'],
                password=self.test_db_config['password'],
                port=self.test_db_config['port']
            )
            
            cursor = connection.cursor()
            cursor.execute('SELECT 1')
            result = cursor.fetchone()
            
            if result == (1,):
                logger.info("✅ Database connection successful")
                self.test_results['database_connection'] = True
                self.test_table_access(cursor)
                
            cursor.close()
            connection.close()
            
        except Exception as e:
            logger.error(f"❌ Database connection failed: {e}")
            self.test_results['errors'].append(f"Database connection error: {e}")
            self.test_results['database_connection'] = False

    def test_table_access(self, cursor):
        """Test access to required database tables"""
        required_tables = ['courses', 'classes', 'acad_term', 'class_timing', 'class_exam_timing']
        
        for table in required_tables:
            try:
                cursor.connection.rollback()
                cursor.execute(f"SELECT COUNT(*) FROM {table}")
                count = cursor.fetchone()[0]
                
                self.test_results['table_access'][table] = {
                    'accessible': True,
                    'record_count': count
                }
                logger.info(f"✅ Table '{table}': {count} records accessible")
                
            except Exception as e:
                cursor.connection.rollback()
                logger.error(f"❌ Table '{table}' access failed: {e}")
                self.test_results['table_access'][table] = {
                    'accessible': False,
                    'error': str(e)
                }

    def load_actual_filepaths(self, sample_size=20):
        """Load actual filepaths from scraped_filepaths.csv"""
        try:
            if not os.path.exists('scraped_filepaths.csv'):
                logger.error("❌ scraped_filepaths.csv not found")
                return []
            
            filepaths_df = pd.read_csv('scraped_filepaths.csv')
            filepath_column = 'Filepath' if 'Filepath' in filepaths_df.columns else 'filepath'
            
            existing_files = []
            for filepath in filepaths_df[filepath_column]:
                if pd.notna(filepath) and os.path.exists(str(filepath).strip()):
                    existing_files.append(str(filepath).strip())
            
            logger.info(f"Found {len(existing_files)} existing files")
            
            if len(existing_files) > sample_size:
                selected_files = random.sample(existing_files, sample_size)
            else:
                selected_files = existing_files
                
            logger.info(f"Selected {len(selected_files)} files for testing")
            return selected_files
            
        except Exception as e:
            logger.error(f"Error loading filepaths: {e}")
            return []

    def import_extractor_class(self):
        """Import AfterClassDataExtractor class"""
        try:
            # In Jupyter notebook, try to access the globally defined class
            if 'AfterClassDataExtractor' in globals():
                return globals()['AfterClassDataExtractor']
            else:
                raise ImportError("AfterClassDataExtractor not found in global scope")
        except Exception as e:
            raise ImportError(f"Could not access AfterClassDataExtractor: {e}")

    def test_extractor_functionality(self, test_filepaths):
        """Test the AfterClassDataExtractor with schema validation"""
        logger.info("Testing AfterClassDataExtractor functionality...")
        
        try:
            AfterClassDataExtractor = self.import_extractor_class()
            logger.info("✅ Successfully imported AfterClassDataExtractor")
            
            extractor = AfterClassDataExtractor(self.test_db_config)
            extractor.setup_selenium_driver()
            logger.info("✅ Selenium WebDriver initialized")
            
            extractor.connect_database()
            logger.info("✅ Extractor database connection successful")
            
            # Test caching functionality
            self.test_caching_functionality(extractor)
            
            # Test file processing with schema validation
            self.test_file_processing_with_validation(extractor, test_filepaths)
            
            # Test CSV generation and schema conformance
            self.test_csv_generation_and_schema(extractor)
            
            extractor.cleanup()
            logger.info("✅ Extractor cleanup completed")
            
        except Exception as e:
            error_msg = f"Extractor functionality test failed: {e}"
            logger.error(f"❌ {error_msg}")
            self.test_results['errors'].append(error_msg)

    def test_caching_functionality(self, extractor):
        """Test the database caching functionality"""
        logger.info("Testing database caching...")
        try:
            if not extractor.load_cached_tables():
                logger.info("Cache not found, downloading from database...")
                extractor.download_and_cache_tables()
                logger.info("✅ Database tables downloaded and cached")
            else:
                logger.info("✅ Cached tables loaded successfully")
            
            if hasattr(extractor, 'courses_df') and extractor.courses_df is not None:
                logger.info(f"✅ Courses cache: {len(extractor.courses_df)} records")
            if hasattr(extractor, 'classes_df') and extractor.classes_df is not None:
                logger.info(f"✅ Classes cache: {len(extractor.classes_df)} records")
                
        except Exception as e:
            logger.error(f"❌ Caching functionality failed: {e}")
            self.test_results['errors'].append(f"Caching error: {e}")

    def test_file_processing_with_validation(self, extractor, test_filepaths):
        """Test processing files with data validation"""
        logger.info(f"Testing file processing with {len(test_filepaths)} files...")
        
        # Process ALL files instead of just first 5
        for i, filepath in enumerate(test_filepaths, 1):
            # Log progress every 100 files
            if i % 100 == 0 or i <= 10:  # Show first 10 individual files, then every 100
                logger.info(f"Processing file {i}/{len(test_filepaths)}: {os.path.basename(filepath)}")
            
            try:
                success = extractor.process_html_file(filepath)
                if success:
                    self.test_results['scraping_success_count'] += 1
                    if i <= 10:  # Only log individual success for first 10 files
                        logger.info(f"✅ Successfully processed: {os.path.basename(filepath)}")
                    
                    # Validate data extraction (only for first 10 files to avoid spam)
                    if i <= 10:
                        self.validate_extracted_data(extractor, filepath)
                    
                else:
                    self.test_results['scraping_error_count'] += 1
                    if i <= 10:  # Only log individual failures for first 10 files
                        logger.warning(f"⚠️ Failed to process: {os.path.basename(filepath)}")
                    
                self.test_results['total_files_tested'] += 1
                
            except Exception as e:
                self.test_results['scraping_error_count'] += 1
                self.test_results['total_files_tested'] += 1
                error_msg = f"Error processing {os.path.basename(filepath)}: {e}"
                if i <= 10:  # Only log individual errors for first 10 files
                    logger.error(f"❌ {error_msg}")
                self.test_results['errors'].append(error_msg)
            
            # Progress update every 100 files
            if i % 100 == 0:
                success_rate = (self.test_results['scraping_success_count'] / self.test_results['total_files_tested']) * 100
                logger.info(f"📊 Progress: {i}/{len(test_filepaths)} files processed ({success_rate:.1f}% success rate)")

    def validate_extracted_data(self, extractor, filepath):
        """Validate that extracted data conforms to schema requirements"""
        try:
            # Validate academic term parsing
            if hasattr(extractor, 'acad_term') and extractor.acad_term:
                latest_term = extractor.acad_term[-1]
                self.validate_acad_term_structure(latest_term, filepath)
            
            # Validate grading basis mapping
            if hasattr(extractor, 'classes_updates') and extractor.classes_updates:
                latest_class = extractor.classes_updates[-1]
                self.validate_grading_basis(latest_class, filepath)
            
            # Validate timing data structure
            if hasattr(extractor, 'class_timing') and extractor.class_timing:
                latest_timing = extractor.class_timing[-1]
                self.validate_timing_structure(latest_timing, filepath)
                
        except Exception as e:
            logger.warning(f"⚠️ Data validation failed for {os.path.basename(filepath)}: {e}")

    def validate_acad_term_structure(self, term_data, filepath):
        """Validate academic term data structure"""
        required_fields = ['id', 'acad_year_start', 'acad_year_end', 'term', 'boss_id', 'start_dt', 'end_dt']
        
        for field in required_fields:
            if field not in term_data:
                raise ValueError(f"Missing required field '{field}' in acad_term")
        
        # Validate ID format (should be like AY202122T1)
        if not re.match(r'^AY\d{6}T[12]$|^AY\d{6}T3[AB]$', term_data['id']):
            raise ValueError(f"Invalid acad_term ID format: {term_data['id']}")
        
        # Validate term values
        if term_data['term'] not in ['1', '2', '3A', '3B']:
            raise ValueError(f"Invalid term value: {term_data['term']}")
        
        logger.debug(f"✅ Academic term validation passed for {os.path.basename(filepath)}")

    def validate_grading_basis(self, class_data, filepath):
        """Validate grading basis conforms to enum"""
        if 'grading_basis' in class_data and class_data['grading_basis'] is not None:
            valid_values = ['GRADED', 'PASS_FAIL', 'NA']
            if class_data['grading_basis'] not in valid_values:
                raise ValueError(f"Invalid grading_basis: {class_data['grading_basis']}")
        
        logger.debug(f"✅ Grading basis validation passed for {os.path.basename(filepath)}")

    def validate_timing_structure(self, timing_data, filepath):
        """Validate timing data structure"""
        required_fields = ['class_id', 'start_date', 'end_date', 'day_of_week', 'start_time', 'end_time', 'venue']
        
        for field in required_fields:
            if field not in timing_data:
                raise ValueError(f"Missing required field '{field}' in class_timing")
        
        # Validate day_of_week format (should be 3 characters)
        if len(str(timing_data['day_of_week'])) > 3:
            raise ValueError(f"day_of_week too long: {timing_data['day_of_week']}")
        
        logger.debug(f"✅ Timing structure validation passed for {os.path.basename(filepath)}")

    def test_csv_generation_and_schema(self, extractor):
        """Test CSV generation and validate schema conformance"""
        logger.info("Testing CSV generation and schema conformance...")
        try:
            output_dir = 'test_output'
            extractor.save_csv_files(output_dir)
            
            for csv_file, schema_info in self.expected_schemas.items():
                filepath = os.path.join(output_dir, csv_file)
                
                if os.path.exists(filepath):
                    self.validate_csv_schema(filepath, csv_file, schema_info)
                else:
                    logger.warning(f"⚠️ Expected file not generated: {csv_file}")
                    self.test_results['csv_generation'][csv_file] = {'generated': False}
            
            # Validate error logging
            self.validate_error_logging(extractor, output_dir)
            
        except Exception as e:
            error_msg = f"CSV generation test failed: {e}"
            logger.error(f"❌ {error_msg}")
            self.test_results['errors'].append(error_msg)

    def validate_csv_schema(self, filepath, csv_file, schema_info):
        """Validate CSV file against expected schema"""
        try:
            df = pd.read_csv(filepath)
            
            # Check required columns
            missing_columns = []
            for col in schema_info['required_columns']:
                if col not in df.columns:
                    missing_columns.append(col)
            
            if missing_columns:
                error_msg = f"{csv_file}: Missing columns {missing_columns}"
                logger.error(f"❌ {error_msg}")
                self.test_results['schema_validation'][csv_file] = {
                    'valid': False,
                    'error': error_msg
                }
                return
            
            # Validate data types and formats
            validation_result = self.validate_data_types(df, csv_file)
            
            self.test_results['csv_generation'][csv_file] = {
                'generated': True,
                'record_count': len(df),
                'columns': list(df.columns)
            }
            
            self.test_results['schema_validation'][csv_file] = validation_result
            
            if validation_result['valid']:
                logger.info(f"✅ {csv_file}: {len(df)} records, schema valid")
            else:
                logger.warning(f"⚠️ {csv_file}: Schema validation issues - {validation_result['warnings']}")
                
        except Exception as e:
            logger.error(f"❌ Error validating {csv_file}: {e}")
            self.test_results['schema_validation'][csv_file] = {
                'valid': False,
                'error': str(e)
            }

    def validate_data_types(self, df, csv_file):
        """Validate data types in CSV"""
        warnings = []
        
        if csv_file == 'acad_term.csv':
            # Validate acad_term specific formats
            for idx, row in df.head(3).iterrows():
                if not re.match(r'^AY\d{6}T[12]$|^AY\d{6}T3[AB]$', str(row.get('id', ''))):
                    warnings.append(f"Invalid ID format in row {idx}: {row.get('id')}")
                
                if row.get('term') not in ['1', '2', '3A', '3B']:
                    warnings.append(f"Invalid term value in row {idx}: {row.get('term')}")
        
        elif csv_file == 'classes_updates.csv':
            # Validate classes specific formats
            for idx, row in df.head(3).iterrows():
                grading = row.get('grading_basis')
                if pd.notna(grading) and grading not in ['GRADED', 'PASS_FAIL', 'NA']:
                    warnings.append(f"Invalid grading_basis in row {idx}: {grading}")
        
        elif csv_file == 'class_timing.csv':
            # Validate timing specific formats
            for idx, row in df.head(3).iterrows():
                day_of_week = str(row.get('day_of_week', ''))
                if len(day_of_week) > 3:
                    warnings.append(f"day_of_week too long in row {idx}: {day_of_week}")
        
        return {
            'valid': len(warnings) == 0,
            'warnings': warnings
        }

    def validate_error_logging(self, extractor, output_dir):
        """Validate error logging functionality"""
        try:
            error_file = os.path.join(output_dir, 'processing_errors.csv')
            
            if hasattr(extractor, 'errors') and extractor.errors:
                logger.info(f"✅ Error logging working: {len(extractor.errors)} errors logged")
                
                if os.path.exists(error_file):
                    error_df = pd.read_csv(error_file)
                    expected_columns = ['filepath', 'error', 'type']
                    
                    missing_error_cols = [col for col in expected_columns if col not in error_df.columns]
                    if missing_error_cols:
                        logger.warning(f"⚠️ Error CSV missing columns: {missing_error_cols}")
                    else:
                        logger.info(f"✅ Error CSV structure valid: {len(error_df)} error records")
                        
        except Exception as e:
            logger.warning(f"⚠️ Error validation failed: {e}")

    def run_all_tests(self):
        """Run comprehensive schema conformance test suite"""
        logger.info("🚀 Starting Schema Conformance Tests")
        logger.info("=" * 60)
        
        # Test 1: Database Connection
        logger.info("📊 Testing Database Connectivity...")
        self.test_database_connection()
        
        # Test 2: Load test files
        logger.info("📁 Loading test HTML files...")
        test_filepaths = self.load_actual_filepaths(1000)
        
        if not test_filepaths:
            logger.error("❌ No valid HTML files found for testing")
            self.generate_test_report()
            return
        
        # Test 3: Schema conformance testing
        logger.info("🔧 Testing Schema Conformance...")
        self.test_extractor_functionality(test_filepaths)
        
        # Generate report
        self.generate_test_report()

    def generate_test_report(self):
        """Generate comprehensive test report"""
        logger.info("=" * 60)
        logger.info("📊 SCHEMA CONFORMANCE TEST RESULTS")
        logger.info("=" * 60)
        
        # Database connectivity
        logger.info("🗄️ DATABASE CONNECTIVITY:")
        db_status = "✅ PASSED" if self.test_results['database_connection'] else "❌ FAILED"
        logger.info(f"  Connection Status: {db_status}")
        
        for table, result in self.test_results['table_access'].items():
            if result.get('accessible'):
                logger.info(f"  {table}: ✅ {result['record_count']} records")
            else:
                logger.info(f"  {table}: ❌ {result.get('error', 'Access failed')}")
        
        # File processing
        logger.info("\n🔧 FILE PROCESSING:")
        total_tested = self.test_results['total_files_tested']
        success_count = self.test_results['scraping_success_count']
        error_count = self.test_results['scraping_error_count']
        
        if total_tested > 0:
            success_rate = (success_count / total_tested) * 100
            logger.info(f"  Processed: {success_count}/{total_tested} files ({success_rate:.1f}% success)")
        else:
            logger.info("  No files were processed")
        
        # CSV generation and schema validation
        logger.info("\n📄 CSV GENERATION & SCHEMA VALIDATION:")
        for csv_file in self.expected_schemas.keys():
            csv_result = self.test_results['csv_generation'].get(csv_file, {})
            schema_result = self.test_results['schema_validation'].get(csv_file, {})
            
            if csv_result.get('generated'):
                record_count = csv_result.get('record_count', 0)
                if schema_result.get('valid'):
                    logger.info(f"  {csv_file}: ✅ {record_count} records, schema valid")
                else:
                    warnings = schema_result.get('warnings', [])
                    logger.warning(f"  {csv_file}: ⚠️ {record_count} records, schema issues: {len(warnings)} warnings")
            else:
                logger.info(f"  {csv_file}: ❌ Not generated")
        
        # Error summary
        if self.test_results['errors']:
            logger.info(f"\n⚠️ ERRORS ENCOUNTERED ({len(self.test_results['errors'])}):")
            for i, error in enumerate(self.test_results['errors'][:3], 1):
                logger.info(f"  {i}. {error}")
            if len(self.test_results['errors']) > 3:
                logger.info(f"  ... and {len(self.test_results['errors']) - 3} more errors")
        else:
            logger.info("\n✅ No critical errors encountered!")
        
        # Recommendations
        logger.info("\n📋 RECOMMENDATIONS:")
        
        schema_issues = [k for k, v in self.test_results['schema_validation'].items() if not v.get('valid')]
        if schema_issues:
            logger.info(f"  • Fix schema issues in: {', '.join(schema_issues)}")
            logger.info("  • Check column names match Prisma schema exactly")
            logger.info("  • Validate data type conversions and enum mappings")
        
        if error_count > 0:
            logger.info("  • Review processing_errors.csv for parsing issues")
        
        if success_count > 0:
            logger.info("  • ✅ Schema conformance testing complete!")
            logger.info("  • Check 'test_output' folder for generated CSV files")
        
        logger.info("=" * 60)

if __name__ == "__main__":
    # Run the schema conformance test suite
    test_runner = SchemaConformanceTestRunner()
    test_runner.run_all_tests()

2025-05-23 20:36:57,966 - INFO - 🚀 Starting Schema Conformance Tests
2025-05-23 20:36:57,967 - INFO - 📊 Testing Database Connectivity...
2025-05-23 20:36:57,967 - INFO - Testing database connection...
2025-05-23 20:36:58,008 - INFO - ✅ Database connection successful
2025-05-23 20:36:58,012 - INFO - ✅ Table 'courses': 0 records accessible
2025-05-23 20:36:58,016 - INFO - ✅ Table 'classes': 0 records accessible
2025-05-23 20:36:58,019 - INFO - ✅ Table 'acad_term': 0 records accessible
2025-05-23 20:36:58,023 - INFO - ✅ Table 'class_timing': 0 records accessible
2025-05-23 20:36:58,026 - INFO - ✅ Table 'class_exam_timing': 0 records accessible
2025-05-23 20:36:58,027 - INFO - 📁 Loading test HTML files...
2025-05-23 20:36:58,821 - INFO - Found 12976 existing files
2025-05-23 20:36:58,823 - INFO - Selected 1000 files for testing
2025-05-23 20:36:58,823 - INFO - 🔧 Testing Schema Conformance...
2025-05-23 20:36:58,824 - INFO - Testing AfterClassDataExtractor functionality...
2025-05-23 20:36:

### ACTUAL SCRIPT

In [62]:
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class AfterClassDataExtractor:
    def __init__(self, db_config: Dict[str, str]):
        """Initialize with database configuration for Supabase"""
        self.db_config = db_config
        self.connection = None
        self.driver = None
        
        # Local cache for courses and classes
        self.courses_df = None
        self.classes_df = None
        self.courses_cache = {}  # Cache for course code to UUID mapping
        self.classes_cache = {}  # Cache for class lookups
        
        # CSV data storage
        self.courses_updates = []
        self.classes_updates = []
        self.acad_term = []
        self.class_timing = []
        self.class_exam_timing = []
        self.errors = []

    def setup_selenium_driver(self):
        """Set up Selenium WebDriver for local file access"""
        try:
            options = Options()
            options.add_argument('--no-sandbox')
            options.add_argument('--disable-dev-shm-usage')
            options.add_argument('--headless')  # Run in headless mode for efficiency
            options.add_argument('--disable-gpu')
            
            service = Service(ChromeDriverManager().install())
            self.driver = webdriver.Chrome(service=service, options=options)
            logger.info("Selenium WebDriver initialized successfully")
        except Exception as e:
            logger.error(f"Failed to initialize Selenium WebDriver: {e}")
            raise

    def connect_database(self):
        """Connect to Supabase PostgreSQL database"""
        try:
            self.connection = psycopg2.connect(
                host=self.db_config['host'],
                database=self.db_config['database'],
                user=self.db_config['user'],
                password=self.db_config['password'],
                port=self.db_config.get('port', 5432)
            )
            logger.info("Database connection established")
        except Exception as e:
            logger.error(f"Database connection failed: {e}")
            raise

    def download_and_cache_tables(self, cache_dir: str = 'db_cache'):
        """Download entire courses and classes tables and cache locally"""
        try:
            os.makedirs(cache_dir, exist_ok=True)
            
            # Download courses table - USE ACTUAL DATABASE COLUMN NAMES from @map
            logger.info("Downloading courses table...")
            courses_query = """
                SELECT id, code, name, description, credit_units, 
                       belong_to_university, belong_to_faculty, 
                       course_area, enrolment_requirements, created_at, updated_at
                FROM courses
            """
            self.courses_df = pd.read_sql_query(courses_query, self.connection)
            
            # Save to cache files
            courses_cache_file = os.path.join(cache_dir, 'courses_cache.pkl')
            self.courses_df.to_pickle(courses_cache_file)
            self.courses_df.to_csv(os.path.join(cache_dir, 'courses_cache.csv'), index=False)
            
            logger.info(f"Downloaded {len(self.courses_df)} courses")
            
            # Download classes table - USE ACTUAL DATABASE COLUMN NAMES from @map
            logger.info("Downloading classes table...")
            classes_query = """
                SELECT id, section, course_id, professor_id, acad_term_id,
                       grading_basis, course_outline_url, boss_id, created_at, updated_at
                FROM classes
            """
            self.classes_df = pd.read_sql_query(classes_query, self.connection)
            
            # Save to cache files
            classes_cache_file = os.path.join(cache_dir, 'classes_cache.pkl')
            self.classes_df.to_pickle(classes_cache_file)
            self.classes_df.to_csv(os.path.join(cache_dir, 'classes_cache.csv'), index=False)
            
            logger.info(f"Downloaded {len(self.classes_df)} classes")
            
            # Build lookup caches
            self._build_lookup_caches()
            
            logger.info("Database tables cached successfully")
            
        except Exception as e:
            logger.error(f"Error downloading and caching tables: {e}")
            raise

    def load_cached_tables(self, cache_dir: str = 'db_cache'):
        """Load cached tables from local files"""
        try:
            courses_cache_file = os.path.join(cache_dir, 'courses_cache.pkl')
            classes_cache_file = os.path.join(cache_dir, 'classes_cache.pkl')
            
            if os.path.exists(courses_cache_file) and os.path.exists(classes_cache_file):
                self.courses_df = pd.read_pickle(courses_cache_file)
                self.classes_df = pd.read_pickle(classes_cache_file)
                
                self._build_lookup_caches()
                
                logger.info(f"Loaded cached tables: {len(self.courses_df)} courses, {len(self.classes_df)} classes")
                return True
            else:
                logger.info("Cache files not found, will download from database")
                return False
                
        except Exception as e:
            logger.error(f"Error loading cached tables: {e}")
            return False

    def _build_lookup_caches(self):
        """Build lookup caches from DataFrames"""
        # Build course code to ID mapping
        for _, row in self.courses_df.iterrows():
            self.courses_cache[row['code']] = row['id']
        
        # Build class lookup cache (course_id + section + acad_term_id -> class_id)
        for _, row in self.classes_df.iterrows():
            if pd.notna(row['acad_term_id']) and pd.notna(row['section']):
                cache_key = f"{row['course_id']}_{row['section']}_{row['acad_term_id']}"
                self.classes_cache[cache_key] = row['id']

    def get_course_id_by_code(self, course_code: str) -> Optional[str]:
        """Get course UUID by course code using local cache"""
        return self.courses_cache.get(course_code)

    def get_class_id(self, course_id: str, section: str, acad_term_id: str) -> Optional[int]:
        """Get class ID using local cache"""
        cache_key = f"{course_id}_{section}_{acad_term_id}"
        return self.classes_cache.get(cache_key)

    def load_html_file(self, filepath: str) -> bool:
        """Load HTML file using Selenium"""
        try:
            # Convert to absolute path and use file:// protocol
            html_file = Path(filepath).resolve()
            file_url = html_file.as_uri()
            
            self.driver.get(file_url)
            logger.debug(f"Loaded HTML file: {filepath}")
            return True
        except Exception as e:
            logger.error(f"Error loading HTML file {filepath}: {e}")
            return False

    def parse_acad_term(self, term_text: str) -> Dict[str, any]:
        """Parse academic term text and return structured data"""
        try:
            # Examples: "2021-22 August Term", "2021-22 Session 1", "2021-22 Session 2"
            pattern = r'(\d{4})-(\d{2})\s+(.*)'
            match = re.search(pattern, term_text)
            
            if not match:
                raise ValueError(f"Cannot parse term: {term_text}")
            
            start_year = int(match.group(1))
            end_year_short = int(match.group(2))
            term_desc = match.group(3).lower()
            
            # Convert 2-digit year to 4-digit
            if end_year_short < 50:  # Assuming years after 2000
                end_year = 2000 + end_year_short
            else:
                end_year = 1900 + end_year_short
            
            # Determine term code - match schema requirements
            if 'august' in term_desc or 'session 1' in term_desc or 'term 1' in term_desc:
                term_code = '1'
            elif 'january' in term_desc or 'session 2' in term_desc or 'term 2' in term_desc:
                term_code = '2'
            elif '3a' in term_desc:
                term_code = '3A'
            elif '3b' in term_desc:
                term_code = '3B'
            else:
                raise ValueError(f"Cannot determine term code from: {term_desc}")
            
            acad_term_id = f"AY{start_year}{end_year_short:02d}T{term_code}"
            
            return {
                'id': acad_term_id,
                'acad_year_start': start_year,
                'acad_year_end': end_year,
                'term': term_code,
                'term_text': term_text
            }
        except Exception as e:
            logger.error(f"Error parsing academic term '{term_text}': {e}")
            return None

    def parse_date_range(self, date_text: str) -> Tuple[Optional[datetime], Optional[datetime]]:
        """Parse date range text and return start and end dates"""
        try:
            # Example: "23-Aug-2021 to 14-Nov-2021"
            pattern = r'(\d{1,2}-\w{3}-\d{4})\s+to\s+(\d{1,2}-\w{3}-\d{4})'
            match = re.search(pattern, date_text)
            
            if not match:
                raise ValueError(f"Cannot parse date range: {date_text}")
            
            start_date_str = match.group(1)
            end_date_str = match.group(2)
            
            start_date = datetime.strptime(start_date_str, '%d-%b-%Y')
            end_date = datetime.strptime(end_date_str, '%d-%b-%Y')
            
            return start_date, end_date
        except Exception as e:
            logger.error(f"Error parsing date range '{date_text}': {e}")
            return None, None

    def parse_single_date(self, date_text: str) -> Optional[datetime]:
        """Parse single date text"""
        try:
            return datetime.strptime(date_text, '%d-%b-%Y')
        except Exception as e:
            logger.error(f"Error parsing date '{date_text}': {e}")
            return None

    def extract_course_outline_url(self) -> Optional[str]:
        """Extract course outline URL from HTML using Selenium"""
        try:
            course_outline_img = self.driver.find_element(By.ID, 'imgCourseOutline')
            onclick_text = course_outline_img.get_attribute('onclick')
            if onclick_text:
                # Extract URL from: window.open('URL','','toolbar=no, width=700, resizable=yes')
                url_match = re.search(r"window\.open\('([^']+)'", onclick_text)
                if url_match:
                    return url_match.group(1)
        except Exception as e:
            logger.debug(f"Course outline URL not found or error: {e}")
        return None

    def extract_boss_id_from_filepath(self, filepath: str) -> Optional[int]:
        """Extract BOSS ID from filepath"""
        try:
            # Example: "SelectedAcadTerm=2110&SelectedClassNumber=1002.html"
            filename = os.path.basename(filepath)
            match = re.search(r'SelectedClassNumber=(\d+)', filename)
            if match:
                return int(match.group(1))
        except Exception as e:
            logger.error(f"Error extracting BOSS ID from '{filepath}': {e}")
        return None

    def safe_find_element_text(self, by: By, value: str) -> Optional[str]:
        """Safely find element and return its text"""
        try:
            element = self.driver.find_element(by, value)
            return element.text.strip() if element else None
        except Exception:
            return None

    def process_html_file(self, filepath: str) -> bool:
        """Process a single HTML file and extract all data using Selenium"""
        try:
            # Load HTML file
            if not self.load_html_file(filepath):
                return False
            
            # Extract basic class information
            class_header_text = self.safe_find_element_text(By.ID, 'lblClassInfoHeader')
            if not class_header_text:
                self.errors.append({
                    'filepath': filepath,
                    'error': 'Missing class header',
                    'type': 'parse_error'
                })
                return False
            
            # Parse course code and section
            course_match = re.match(r'([A-Z0-9_]+)\s*-\s*(.+)', class_header_text)
            if not course_match:
                self.errors.append({
                    'filepath': filepath,
                    'error': f'Cannot parse course code from: {class_header_text}',
                    'type': 'parse_error'
                })
                return False
            
            course_code = course_match.group(1)
            section = course_match.group(2)
            
            # Get course ID from local cache
            course_id = self.get_course_id_by_code(course_code)
            if not course_id:
                self.errors.append({
                    'filepath': filepath,
                    'error': f'Course not found in cache: {course_code}',
                    'type': 'database_error'
                })
                return False
            
            # Extract academic term
            term_text = self.safe_find_element_text(By.ID, 'lblClassInfoSubHeader')
            if not term_text:
                self.errors.append({
                    'filepath': filepath,
                    'error': 'Missing academic term',
                    'type': 'parse_error'
                })
                return False
            
            term_data = self.parse_acad_term(term_text)
            if not term_data:
                self.errors.append({
                    'filepath': filepath,
                    'error': f'Cannot parse academic term: {term_text}',
                    'type': 'parse_error'
                })
                return False
            
            # Extract course areas
            course_areas = self.safe_find_element_text(By.ID, 'lblCourseAreas')
            if course_areas:
                # Clean up HTML tags if any
                course_areas = re.sub(r'<[^>]+>', '', course_areas)
            
            # Extract enrollment requirements
            enrolment_req = self.safe_find_element_text(By.ID, 'lblEnrolmentRequirements')
            
            # Extract grading basis - match Prisma enum exactly
            grading_text = self.safe_find_element_text(By.ID, 'lblGradingBasis')
            grading_basis = None
            if grading_text:
                if grading_text.lower() in ['graded']:
                    grading_basis = 'GRADED'
                elif grading_text.lower() in ['pass/fail', 'pass fail']:
                    grading_basis = 'PASS_FAIL'
                else:
                    grading_basis = 'NA'
            
            # Extract course outline URL
            course_outline_url = self.extract_course_outline_url()
            
            # Extract period dates
            period_text = self.safe_find_element_text(By.ID, 'lblDates')
            start_dt, end_dt = None, None
            if period_text:
                start_dt, end_dt = self.parse_date_range(period_text)
            
            # Extract BOSS ID
            boss_id = self.extract_boss_id_from_filepath(filepath)
            
            # Add course update record - match database column names exactly
            self.courses_updates.append({
                'code': course_code,
                'course_area': course_areas,
                'enrolment_requirements': enrolment_req
            })
            
            # Add academic term record - match database column names exactly
            acad_term_record = {
                'id': term_data['id'],
                'acad_year_start': term_data['acad_year_start'],
                'acad_year_end': term_data['acad_year_end'],
                'term': term_data['term'],
                'boss_id': boss_id,
                'start_dt': start_dt.isoformat() if start_dt else None,
                'end_dt': end_dt.isoformat() if end_dt else None
            }
            self.acad_term.append(acad_term_record)
            
            # Add class update record - match database column names exactly
            self.classes_updates.append({
                'course_code': course_code,
                'section': section,
                'acad_term_id': term_data['id'],
                'grading_basis': grading_basis,
                'course_outline_url': course_outline_url
            })
            
            # Get class ID for timing records
            class_id = self.get_class_id(course_id, section, term_data['id'])
            if not class_id:
                self.errors.append({
                    'filepath': filepath,
                    'error': f'Class not found: {course_code}-{section} for {term_data["id"]}',
                    'type': 'database_error'
                })
                # Continue processing but won't add timing records
            
            # Extract meeting information
            self.extract_meeting_information(class_id, filepath)
            
            return True
            
        except Exception as e:
            self.errors.append({
                'filepath': filepath,
                'error': str(e),
                'type': 'processing_error'
            })
            logger.error(f"Error processing file {filepath}: {e}")
            return False

    def extract_meeting_information(self, class_id: Optional[int], filepath: str):
        """Extract class timing and exam timing information using Selenium"""
        try:
            # Find the meeting information table
            meeting_table = self.driver.find_element(By.ID, 'RadGrid_MeetingInfo_ctl00')
            
            # Find all data rows in tbody (skip header)
            tbody = meeting_table.find_element(By.TAG_NAME, 'tbody')
            rows = tbody.find_elements(By.TAG_NAME, 'tr')
            
            for row in rows:
                cells = row.find_elements(By.TAG_NAME, 'td')
                if len(cells) < 8:
                    continue
                
                meeting_type = cells[0].text.strip()
                start_date_text = cells[1].text.strip()
                end_date_text = cells[2].text.strip()
                day_of_week = cells[3].text.strip()
                start_time = cells[4].text.strip()
                end_time = cells[5].text.strip()
                venue = cells[6].text.strip()
                
                if meeting_type == 'CLASS':
                    # Parse dates for class timing
                    start_date = self.parse_single_date(start_date_text)
                    end_date = self.parse_single_date(end_date_text)
                    
                    # Use database column names exactly as per schema @map
                    timing_record = {
                        'class_id': class_id,
                        'start_date': start_date.isoformat() if start_date else None,
                        'end_date': end_date.isoformat() if end_date else None,
                        'day_of_week': day_of_week,
                        'start_time': start_time,
                        'end_time': end_time,
                        'venue': venue
                    }
                    self.class_timing.append(timing_record)
                
                elif meeting_type == 'EXAM':
                    # Parse date for exam timing
                    exam_date = self.parse_single_date(start_date_text)
                    
                    # Use database column names exactly as per schema @map
                    exam_record = {
                        'class_id': class_id,
                        'date': exam_date.isoformat() if exam_date else None,
                        'day_of_week': day_of_week,
                        'start_time': start_time,
                        'end_time': end_time,
                        'venue': ''  # Leave empty as specified in schema
                    }
                    self.class_exam_timing.append(exam_record)
        
        except Exception as e:
            self.errors.append({
                'filepath': filepath,
                'error': f'Error extracting meeting information: {str(e)}',
                'type': 'parse_error'
            })

    def save_csv_files(self, output_dir: str = 'extracted_data'):
        """Save all extracted data to CSV files"""
        os.makedirs(output_dir, exist_ok=True)
        
        # Save courses updates
        if self.courses_updates:
            df = pd.DataFrame(self.courses_updates)
            df.to_csv(os.path.join(output_dir, 'courses_updates.csv'), index=False)
            logger.info(f"Saved {len(self.courses_updates)} course update records")
        
        # Save classes updates
        if self.classes_updates:
            df = pd.DataFrame(self.classes_updates)
            df.to_csv(os.path.join(output_dir, 'classes_updates.csv'), index=False)
            logger.info(f"Saved {len(self.classes_updates)} class update records")
        
        # Save academic terms - use database table name exactly
        if self.acad_term:
            df = pd.DataFrame(self.acad_term)
            # Remove duplicates based on ID
            df = df.drop_duplicates(subset=['id'])
            df.to_csv(os.path.join(output_dir, 'acad_term.csv'), index=False)
            logger.info(f"Saved {len(df)} academic term records")
        
        # Save class timings - use database table name exactly
        if self.class_timing:
            df = pd.DataFrame(self.class_timing)
            df.to_csv(os.path.join(output_dir, 'class_timing.csv'), index=False)
            logger.info(f"Saved {len(self.class_timing)} class timing records")
        
        # Save exam timings - use database table name exactly
        if self.class_exam_timing:
            df = pd.DataFrame(self.class_exam_timing)
            df.to_csv(os.path.join(output_dir, 'class_exam_timing.csv'), index=False)
            logger.info(f"Saved {len(self.class_exam_timing)} exam timing records")
        
        # Save errors
        if self.errors:
            df = pd.DataFrame(self.errors)
            df.to_csv(os.path.join(output_dir, 'processing_errors.csv'), index=False)
            logger.info(f"Saved {len(self.errors)} error records")

    def process_all_files(self, scraped_filepaths_csv: str, output_dir: str = 'extracted_data'):
        """Process all files listed in the scraped filepaths CSV"""
        try:
            # Read the CSV file with file paths
            df = pd.read_csv(scraped_filepaths_csv)
            
            # Handle both 'Filepath' and 'filepath' column names
            filepath_column = 'Filepath' if 'Filepath' in df.columns else 'filepath'
            
            total_files = len(df)
            processed_files = 0
            successful_files = 0
            
            logger.info(f"Starting to process {total_files} files")
            
            for index, row in df.iterrows():
                filepath = row[filepath_column]
                
                if os.path.exists(filepath):
                    if self.process_html_file(filepath):
                        successful_files += 1
                    processed_files += 1
                    
                    if processed_files % 100 == 0:
                        logger.info(f"Processed {processed_files}/{total_files} files")
                else:
                    self.errors.append({
                        'filepath': filepath,
                        'error': 'File not found',
                        'type': 'file_error'
                    })
            
            logger.info(f"Processing complete: {successful_files}/{processed_files} files successful")
            
            # Save all CSV files
            self.save_csv_files(output_dir)
            
        except Exception as e:
            logger.error(f"Error in process_all_files: {e}")
            raise

    def cleanup(self):
        """Clean up resources"""
        if self.driver:
            self.driver.quit()
            logger.info("Selenium WebDriver closed")
        
        if self.connection:
            self.connection.close()
            logger.info("Database connection closed")

def main():
    """Main function to run the data extraction"""
    
    # Database configuration - Fill in your Supabase credentials
    db_config = {
        'host': 'localhost',  # Replace with your Supabase host
        'database': 'postgres',
        'user': 'postgres',  # Replace with your username  
        'password': 'changeme',  # Replace with your password
        'port': 5433
    }
    
    # Initialize extractor
    extractor = AfterClassDataExtractor(db_config)
    
    try:
        # Set up Selenium WebDriver
        extractor.setup_selenium_driver()
        
        # Connect to database
        extractor.connect_database()
        
        # Try to load cached tables first, if not available download them
        if not extractor.load_cached_tables():
            logger.info("Downloading fresh data from database...")
            extractor.download_and_cache_tables()
        
        # Process all files
        extractor.process_all_files('scraped_filepaths.csv', 'extracted_data')
        
        print("Data extraction completed successfully!")
        print("Check the 'extracted_data' folder for CSV files:")
        print("- courses_updates.csv: Course area and enrollment requirements updates")
        print("- classes_updates.csv: Class grading basis, term, and outline URL updates")
        print("- acad_term.csv: Academic term records")
        print("- class_timing.csv: Class timing records")
        print("- class_exam_timing.csv: Exam timing records")
        print("- processing_errors.csv: Any errors encountered during processing")
        print("\nDatabase cache stored in 'db_cache' folder for future runs")
        
    except Exception as e:
        logger.error(f"Main process failed: {e}")
        raise
    finally:
        extractor.cleanup()

In [None]:
if __name__ == "__main__":
    main()



IndexError: string index out of range

In [13]:
# # Function to test scraping and CSV saving
# def test_scrape_class_details():
#     test_url = "https://boss.intranet.smu.edu.sg/ClassDetails.aspx?SelectedAcadTerm=2420&SelectedClassNumber=1580"
#     csv_filename = "TestClassDetails.csv"
#     print("Starting test scrape...")

#     # Open CSV file for writing
#     with open(csv_filename, "w", newline="", encoding="utf-8") as file:
#         writer = csv.writer(file)
#         headers = ["Term", "Course Code", "Section", "Description", "Grading Basis"]
#         for i in range(1, 4):  # Dynamic columns for up to 3 classes
#             headers.extend([f"class{i}_day", f"class{i}_starttime", f"class{i}_venue"])
#         writer.writerow(headers)

#         driver.get(test_url)
#         time.sleep(2)  # Allow time for page load

#         try:
#             # Extract key elements
#             wait = WebDriverWait(driver, 10)
#             course_header = wait.until(EC.presence_of_element_located((By.ID, "lblClassInfoHeader"))).text
#             description = driver.find_element(By.ID, "lblClassSection").text
#             term = driver.find_element(By.ID, "lblClassInfoSubHeader").text
#             grading_basis = driver.find_element(By.ID, "lblGradingBasis").text

#             # Split course code and section
#             course_code, section = [item.strip() for item in course_header.split('-')]

#             # Extract meeting details
#             class_details = []
#             rows = driver.find_elements(By.CSS_SELECTOR, "#RadGrid_MeetingInfo_ctl00 tr.rgRow, #RadGrid_MeetingInfo_ctl00 tr.rgAltRow")
#             for row in rows:
#                 cells = row.find_elements(By.TAG_NAME, "td")
#                 if cells and cells[0].text == "CLASS":
#                     class_details.append({
#                         "day": cells[3].text,
#                         "start_time": cells[4].text,
#                         "venue": cells[6].text
#                     })

#             # Prepare row data
#             row_data = [term, course_code, section, description, grading_basis]
#             for detail in class_details[:3]:  # Include up to 3 classes
#                 row_data.extend([detail["day"], detail["start_time"], detail["venue"]])

#             # Pad missing columns
#             for _ in range(len(class_details), 3):
#                 row_data.extend(["", "", ""])

#             # Write to CSV
#             writer.writerow(row_data)
#             print(f"Test data successfully written to {csv_filename}!")

#         except Exception as e:
#             print(f"Error occurred: {e}")

In [14]:
# # Main Execution
# try:
#     # Step 1: Navigate and wait for manual login
#     driver.get("https://boss.intranet.smu.edu.sg/OverallResults.aspx")
#     wait_for_manual_login()

#     # Step 2: Run the test scrape function
#     test_scrape_class_details()

# finally:
#     driver.quit()
#     print("Test completed!")

### **3.2 Scrape Class Details**

In [15]:
# def scrape_class_details(ay, term_code, class_number, csv_writer):
#     url = f"https://boss.intranet.smu.edu.sg/ClassDetails.aspx?SelectedClassNumber={class_number:04}&SelectedAcadTerm={ay}{term_code}&SelectedAcadCareer=UGRD"
#     driver.get(url)

#     # Immediately check for "No record found" in the raw page source
#     if "No record found" in driver.page_source:
#         return  # Exit early

#     try:
#         # Extract course details
#         course_header = driver.find_element(By.ID, "lblClassInfoHeader").text
#         description = driver.find_element(By.ID, "lblClassSection").text
#         term = driver.find_element(By.ID, "lblClassInfoSubHeader").text
#         grading_basis = driver.find_element(By.ID, "lblGradingBasis").text

#         # Split course header into Course Code and Section
#         course_code, section = [item.strip() for item in course_header.split('-')]

#         # Extract meeting and exam details
#         class_details = []
#         exam_details = {"exam_startdate": "", "exam_day": "", "exam_starttime": ""}

#         rows = driver.find_elements(By.CSS_SELECTOR, "#RadGrid_MeetingInfo_ctl00 tr.rgRow, #RadGrid_MeetingInfo_ctl00 tr.rgAltRow")
#         for row in rows:
#             cells = row.find_elements(By.TAG_NAME, "td")
#             if cells:
#                 if cells[0].text == "CLASS":  # CLASS rows
#                     class_details.append({
#                         "day": cells[3].text,
#                         "start_time": cells[4].text,
#                         "venue": cells[6].text
#                     })
#                 elif cells[0].text == "EXAM":  # EXAM row
#                     exam_details["exam_startdate"] = cells[1].text
#                     exam_details["exam_day"] = cells[3].text
#                     exam_details["exam_starttime"] = cells[4].text

#         # Prepare row data with SelectedClassNumber and SelectedAcadTerm
#         row_data = [class_number, f"{ay}{term_code}", term, course_code, section, description, grading_basis]

#         # Add class details (up to 3 classes)
#         for detail in class_details[:3]:
#             row_data.extend([detail["day"], detail["start_time"], detail["venue"]])
#         for _ in range(len(class_details), 3):  # Pad missing class details
#             row_data.extend(["", "", ""])

#         # Add exam details
#         row_data.extend([
#             exam_details["exam_startdate"],
#             exam_details["exam_day"],
#             exam_details["exam_starttime"]
#         ])

#         # Write to CSV
#         csv_writer.writerow(row_data)
#         print(f"Scraped: AY{ay}, Term {term_code}, Class Number {class_number:04}")
#         return True

#     except Exception as e:
#         print(f"Error scraping Class Number {class_number:04}, AY{ay}, Term {term_code}: {e}")
#         return False


---
### **3.3 Main Scraping Loop**

In [16]:
# def main():
#     ay_list = range(21, 25)  # AY2021 to AY2024
#     term_mapping = {"10": "T1", "20": "T2", "31": "T3A", "32": "T3B"}

#     for ay in ay_list:
#         for term_code, term_name in term_mapping.items():

#             # # Use this if your code suddenly stops.
#             # # Skip AY 2021 Term 1
#             # if ay == 21 and term_code == "10":
#             #     print(f"Skipping AY{ay}, Term {term_code} as it has already been scraped.")
#             #     continue  # Skip this iteration

#             filename = f"20{ay}-20{ay+1}_{term_name}AddedInfo.csv"
#             print(f"Starting scraping for file: {filename}")

#             with open(filename, "w", newline="", encoding="utf-8") as file:
#                 writer = csv.writer(file)
#                 headers = ["SelectedClassNumber", "SelectedAcadTerm", "Term", "Course Code", "Section", "Description", "Grading Basis"]
#                 for i in range(1, 4):
#                     headers.extend([f"class{i}_day", f"class{i}_starttime", f"class{i}_venue"])
#                 headers.extend(["exam_startdate", "exam_day", "exam_starttime"])
#                 writer.writerow(headers)

#                 class_number = 1000  # Start from class number 0001
#                 no_record_count = 0  # Track consecutive "No record found"

#                 while True:
#                     success = scrape_class_details(ay, term_code, class_number, writer)

#                     if not success:  # If no record is found
#                         no_record_count += 1
#                     else:
#                         no_record_count = 0  # Reset the counter if a record is found

#                     # Stop if no record is found 300 times in a row
#                     if no_record_count >= 300:
#                         print(f"300 consecutive 'No record found' reached. Moving to next term.")
#                         break

#                     class_number += 1  # Increment to next class number

#     driver.quit()
#     print("Scraping completed!")


---
## **4. Execution**

In [17]:
# if __name__ == "__main__":
#     driver.get("https://boss.intranet.smu.edu.sg/OverallResults.aspx")
#     wait_for_manual_login()
#     main()