In [None]:
from __future__ import annotations

import base64
import getpass
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, List, Any

import pandas as pd
import requests
from pydantic import BaseModel, Field
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from collections import deque

# Configuration
class Config:
    # API Settings
    BASE_URL = "https://api.companieshouse.gov.uk"
    MAX_RESULTS = 1000
    ITEMS_PER_PAGE = 100
    
    # Rate Limiting
    MAX_REQUESTS = 590
    TIME_WINDOW = 300  # seconds
    
    # SIC Codes for Insurance Companies
    INSURANCE_SIC_CODES = ['651', '652']
    
    # Search Terms
    SEARCH_TERMS = [
        "insurance company",
        "insurance limited",
        "insurance ltd",
        "insurance plc",
        "assurance company",
        "assurance limited",
        "assurance ltd",
        "assurance plc"
    ]
    
    # Output Directory
    OUTPUT_DIR = Path("insurance_companies_data")

# Data Models
class RegisteredOffice(BaseModel):
    address_line_1: Optional[str] = None
    address_line_2: Optional[str] = None
    locality: Optional[str] = None
    postal_code: Optional[str] = None

class CompanyData(BaseModel):
    company_number: str
    company_name: str
    company_status: str
    date_of_creation: str
    registered_office_address: str
    sic_codes: str

class APIResponse(BaseModel):
    items: List[Dict[str, Any]]
    total_results: Optional[int] = None
    page_number: Optional[int] = None
    kind: Optional[str] = None

# Set up logging
def setup_logging() -> logging.Logger:
    logger = logging.getLogger(__name__)
    logger.propagate = False
    logger.setLevel(logging.INFO)
    logger.handlers.clear()
    
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    
    return logger

class RateLimiter:
    """Implements rate limiting for API requests."""
    
    def __init__(self, max_requests: int, time_window: int):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests: deque = deque()
    
    def can_make_request(self) -> bool:
        now = datetime.utcnow()
        while self.requests and self.requests[0] < now - timedelta(seconds=self.time_window):
            self.requests.popleft()
        
        if len(self.requests) < self.max_requests:
            self.requests.append(now)
            return True
        return False
    
    def wait_if_needed(self) -> int:
        while not self.can_make_request():
            time.sleep(1)
        return len(self.requests)

class CompaniesHouseAPI:
    """Handler for Companies House API interactions."""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.rate_limiter = RateLimiter(Config.MAX_REQUESTS, Config.TIME_WINDOW)
        self.session = self._create_session()
        
        # Create base64 encoded authentication
        auth = base64.b64encode(f"{api_key}:".encode('ascii')).decode('ascii')
        self.headers = {
            'Authorization': f'Basic {auth}',
            'Accept': 'application/json'
        }
    
    @staticmethod
    def _create_session() -> requests.Session:
        """Create a session with retry strategy."""
        session = requests.Session()
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("https://", adapter)
        return session
    
    def search_companies(self, search_term: str, start_index: int = 0) -> Optional[APIResponse]:
        """Search for companies using the Companies House API."""
        params = {
            'q': search_term,
            'items_per_page': Config.ITEMS_PER_PAGE,
            'start_index': start_index,
            'company_status': 'active'
        }
        
        try:
            self.rate_limiter.wait_if_needed()
            
            if start_index >= Config.MAX_RESULTS:
                logger.info(f"Reached maximum results limit ({Config.MAX_RESULTS}) for search term: {search_term}")
                return APIResponse(items=[])
            
            response = self.session.get(
                f"{Config.BASE_URL}/search/companies",
                headers=self.headers,
                params=params
            )
            response.raise_for_status()
            return APIResponse(**response.json())
            
        except requests.exceptions.RequestException as e:
            logger.error(f"Error making request: {e}")
            return None
    
    def get_company_details(self, company_number: str) -> Optional[Dict[str, Any]]:
        """Get detailed information for a specific company."""
        try:
            self.rate_limiter.wait_if_needed()
            
            response = self.session.get(
                f"{Config.BASE_URL}/company/{company_number}",
                headers=self.headers
            )
            response.raise_for_status()
            return response.json()
            
        except requests.exceptions.RequestException as e:
            logger.error(f"Error fetching details for company {company_number}: {e}")
            return None

class InsuranceCompanyFinder:
    """Main class for finding and processing insurance companies."""
    
    def __init__(self, api: CompaniesHouseAPI):
        self.api = api
        self.companies: List[CompanyData] = []
    
    def is_insurance_company(self, sic_codes: List[str]) -> bool:
        """Check if company is an insurance company based on SIC codes."""
        return any(code.startswith(prefix) for prefix in Config.INSURANCE_SIC_CODES 
                  for code in sic_codes if code)
    
    def process_company(self, company_number: str) -> None:
        """Process a single company."""
        if any(c.company_number == company_number for c in self.companies):
            return
            
        company_details = self.api.get_company_details(company_number)
        if not company_details:
            return
            
        sic_codes = company_details.get('sic_codes', [])
        if not self.is_insurance_company(sic_codes):
            return
            
        registered_office = company_details.get('registered_office_address', {})
        address_str = ', '.join(filter(None, [
            registered_office.get('address_line_1', ''),
            registered_office.get('address_line_2', ''),
            registered_office.get('locality', ''),
            registered_office.get('postal_code', '')
        ]))
        
        company_data = CompanyData(
            company_number=company_number,
            company_name=company_details.get('company_name', ''),
            company_status=company_details.get('company_status', ''),
            date_of_creation=company_details.get('date_of_creation', ''),
            registered_office_address=address_str,
            sic_codes=', '.join(sic_codes) if sic_codes else ''
        )
        
        self.companies.append(company_data)
        logger.info(f"Added: {company_data.company_name}")
    
    def search_all_companies(self) -> pd.DataFrame:
        """Search and process all insurance companies."""
        for search_term in Config.SEARCH_TERMS:
            logger.info(f"\nSearching for: {search_term}")
            start_index = 0
            
            while start_index < Config.MAX_RESULTS:
                results = self.api.search_companies(search_term, start_index)
                if not results or not results.items:
                    break
                
                logger.info(f"Processing {len(results.items)} results starting at index {start_index}")
                
                for company in results.items:
                    if company_number := company.get('company_number'):
                        self.process_company(company_number)
                
                if len(results.items) < Config.ITEMS_PER_PAGE:
                    break
                    
                start_index += Config.ITEMS_PER_PAGE
            
            logger.info(f"Completed search for term: {search_term}")
        
        # Convert to DataFrame
        df = pd.DataFrame([c.dict() for c in self.companies])
        return df.drop_duplicates(subset='company_number').sort_values('company_name')

def get_api_key() -> str:
    """Securely prompt for Companies House API key."""
    try:
        # First try to get from environment variable
        # api_key = os.getenv('COMPANIES_HOUSE_API_KEY')
        # if api_key:
        #     return api_key
            
        # If not in environment, prompt user
        return getpass.getpass("Please enter your Companies House API key: ")
    except Exception as e:
        logger.error(f"Error getting API key: {e}")
        raise

def main() -> None:
    """Main function to run the insurance companies search and save results."""
    start_time = datetime.utcnow()
    current_user = os.getenv('USER', 'default_user')

    logger = setup_logging()
    
    logger.info(f"Script started by {current_user} at {start_time.strftime('%Y-%m-%d %H:%M:%S')} UTC")
    
    # Get API key securely
    api_key = get_api_key()
    
    # Create output directory
    Config.OUTPUT_DIR.mkdir(exist_ok=True)
    
    try:
        api = CompaniesHouseAPI(api_key)
        finder = InsuranceCompanyFinder(api)
        insurance_companies_df = finder.search_all_companies()
        
        # Display results
        logger.info(f"\nFound {len(insurance_companies_df)} unique insurance companies")
        logger.info("\nFirst few companies:")
        logger.info(insurance_companies_df.head())
        
        # Save to CSV
        timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
        output_file = Config.OUTPUT_DIR / f"insurance_companies_{timestamp}_{current_user}.csv"
        insurance_companies_df.to_csv(output_file, index=False)
        logger.info(f"\nResults saved to {output_file}")
        
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        raise
    finally:
        end_time = datetime.utcnow()
        logger.info(f"Script finished at {end_time.strftime('%Y-%m-%d %H:%M:%S')} UTC")
        logger.info(f"Total runtime: {end_time - start_time}")

if __name__ == "__main__":
    main()