In [20]:
from __future__ import annotations

import base64
import getpass
import logging
import os
import time  # Add time module import
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, List, Any
from collections import deque

import pandas as pd
import requests
from pydantic import BaseModel, Field
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# Configuration class
class Config:
    # API Settings
    BASE_URL = "https://api.companieshouse.gov.uk"
    MAX_RESULTS = 1000
    ITEMS_PER_PAGE = 100
    
    # Rate Limiting
    MAX_REQUESTS = 590
    TIME_WINDOW = 300  # seconds
    
    # SIC Codes for Insurance Companies
    INSURANCE_SIC_CODES = ['651', '652']
    
    # Search Terms
    SEARCH_TERMS = [
        "insurance company",
        "insurance limited",
        "insurance ltd",
        "insurance plc",
        "assurance company",
        "assurance limited",
        "assurance ltd",
        "assurance plc"
    ]
    
    # Output Directory
    OUTPUT_DIR = Path("insurance_companies_data")

# Data Models
class RegisteredOffice(BaseModel):
    address_line_1: Optional[str] = None
    address_line_2: Optional[str] = None
    locality: Optional[str] = None
    postal_code: Optional[str] = None

class CompanyData(BaseModel):
    company_number: str
    company_name: str
    company_status: str
    date_of_creation: str
    registered_office_address: str
    sic_codes: str

class APIResponse(BaseModel):
    items: List[Dict[str, Any]]
    total_results: Optional[int] = None
    page_number: Optional[int] = None
    kind: Optional[str] = None

# Set up logging
def setup_logging() -> logging.Logger:
    logger = logging.getLogger(__name__)
    logger.propagate = False
    logger.setLevel(logging.INFO)
    logger.handlers.clear()
    
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    
    return logger

class RateLimiter:
    
    import time

    """Implements rate limiting for API requests."""
    
    def __init__(self, max_requests: int, time_window: int, logger: logging.Logger):
        self.max_requests = max_requests
        self.time_window = time_window
        self.logger = logger  # Add logger as instance variable
        self.requests: deque = deque()
    
    def can_make_request(self) -> bool:
        now = datetime.utcnow()
        while self.requests and self.requests[0] < now - timedelta(seconds=self.time_window):
            self.requests.popleft()
        
        if len(self.requests) < self.max_requests:
            self.requests.append(now)
            return True
        return False
    
    def wait_if_needed(self) -> int:
        current_requests = len(self.requests)
        if current_requests >= self.max_requests:
            self.logger.info(f"Rate limit reached. Current requests in window: {current_requests}")
        
        while not self.can_make_request():
            self.logger.debug(f"Waiting for rate limit. Current requests: {len(self.requests)}")
            time.sleep(1)
        
        current_requests = len(self.requests)
        self.logger.debug(f"Request allowed. Current requests in window: {current_requests}")
        return current_requests

class CompaniesHouseAPI:
    """Handler for Companies House API interactions."""
    
    def __init__(self, api_key: str, logger: logging.Logger):
        self.api_key = api_key
        self.logger = logger
        # Pass logger to RateLimiter
        self.rate_limiter = RateLimiter(Config.MAX_REQUESTS, Config.TIME_WINDOW, logger)
        self.session = self._create_session()
        
        # Create base64 encoded authentication
        auth = base64.b64encode(f"{api_key}:".encode('ascii')).decode('ascii')
        self.headers = {
            'Authorization': f'Basic {auth}',
            'Accept': 'application/json'
        }
    
    @staticmethod
    def _create_session() -> requests.Session:
        """Create a session with retry strategy."""
        session = requests.Session()
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("https://", adapter)
        return session
    
    def search_companies(self, search_term: str, start_index: int = 0) -> Optional[APIResponse]:
        """Search for companies using the Companies House API."""
        params = {
            'q': search_term,
            'items_per_page': Config.ITEMS_PER_PAGE,
            'start_index': start_index,
            'company_status': 'active'
        }
        
        try:
            self.rate_limiter.wait_if_needed()
            
            if start_index >= Config.MAX_RESULTS:
                self.logger.info(f"Reached maximum results limit ({Config.MAX_RESULTS}) for search term: {search_term}")
                return APIResponse(items=[])
            
            response = self.session.get(
                f"{Config.BASE_URL}/search/companies",
                headers=self.headers,
                params=params
            )
            response.raise_for_status()
            return APIResponse(**response.json())
            
        except requests.exceptions.RequestException as e:
            self.logger.error(f"Error making request: {e}")
            return None
    
    def get_company_details(self, company_number: str) -> Optional[Dict[str, Any]]:
        """Get detailed information for a specific company."""
        try:
            self.rate_limiter.wait_if_needed()
            
            response = self.session.get(
                f"{Config.BASE_URL}/company/{company_number}",
                headers=self.headers
            )
            response.raise_for_status()
            return response.json()
            
        except requests.exceptions.RequestException as e:
            self.logger.error(f"Error fetching details for company {company_number}: {e}")
            return None

def get_api_key() -> str:
    """Securely prompt for Companies House API key."""
    try:
        return getpass.getpass("Please enter your Companies House API key: ")
    except Exception as e:
        logger.error(f"Error getting API key: {e}")
        raise

class InsuranceCompanyFinder:
    """Main class for finding and processing insurance companies."""
    
    def __init__(self, api: CompaniesHouseAPI, logger: logging.Logger):
        self.api = api
        self.logger = logger
        self.company_names: List[str] = []  # Only store company names
    
    def is_insurance_company(self, sic_codes: List[str]) -> bool:
        """Check if company is an insurance company based on SIC codes."""
        return any(code.startswith(prefix) for prefix in Config.INSURANCE_SIC_CODES 
                  for code in sic_codes if code)
    
    def process_company(self, company_number: str, company_name: str) -> None:
        """Process a single company."""
        if company_name in self.company_names:  # Check for duplicate names
            return
            
        company_details = self.api.get_company_details(company_number)
        if not company_details:
            return
            
        sic_codes = company_details.get('sic_codes', [])
        if not self.is_insurance_company(sic_codes):
            return
            
        self.company_names.append(company_name)
        self.logger.info(f"Added company: {company_name}")
    
    def search_all_companies(self) -> List[str]:
        """Search and return all insurance company names."""
        for search_term in Config.SEARCH_TERMS:
            self.logger.info(f"\nSearching for: {search_term}")
            start_index = 0
            
            while start_index < Config.MAX_RESULTS:
                results = self.api.search_companies(search_term, start_index)
                if not results or not results.items:
                    break
                
                self.logger.info(f"Processing {len(results.items)} results starting at index {start_index}")
                
                for company in results.items:
                    company_number = company.get('company_number')
                    company_name = company.get('title')
                    if company_number and company_name:
                        self.process_company(company_number, company_name)
                
                if len(results.items) < Config.ITEMS_PER_PAGE:
                    break
                    
                start_index += Config.ITEMS_PER_PAGE
            
            self.logger.info(f"Completed search for term: {search_term}")
        
        return sorted(self.company_names)

def main() -> None:
    """Main function to run the insurance companies search and save results."""
    start_time = datetime.strptime("2025-01-15 13:47:05", "%Y-%m-%d %H:%M:%S")
    current_user = "scubamut"

    logger = setup_logging()
    
    logger.info(f"Script started by {current_user} at {start_time.strftime('%Y-%m-%d %H:%M:%S')} UTC")
    
    # Get API key securely
    api_key = get_api_key()
    
    # Create output directory
    Config.OUTPUT_DIR.mkdir(exist_ok=True)
    
    try:
        api = CompaniesHouseAPI(api_key, logger)
        finder = InsuranceCompanyFinder(api, logger)
        company_names = finder.search_all_companies()
        
        # Display results
        logger.info(f"\nFound {len(company_names)} unique insurance companies")
        logger.info("\nCompany names:")
        for name in company_names:
            logger.info(name)
        
        # Save to simple text file
        timestamp = start_time.strftime('%Y%m%d_%H%M%S')
        output_file = Config.OUTPUT_DIR / f"insurance_company_names_{timestamp}_{current_user}.txt"
        
        with open(output_file, 'w', encoding='utf-8') as f:
            for name in company_names:
                f.write(f"{name}\n")
                
        logger.info(f"\nResults saved to {output_file}")
        
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        raise
    finally:
        end_time = datetime.utcnow()
        logger.info(f"Script finished at {end_time.strftime('%Y-%m-%d %H:%M:%S')} UTC")
        logger.info(f"Total runtime: {end_time - start_time}")

if __name__ == "__main__":
    main()

2025-01-17 14:47:56,731 - INFO - Script started by scubamut at 2025-01-15 13:47:05 UTC


KeyboardInterrupt: Interrupted by user

In [None]:
# Configuration class
class Config:
    # API Settings
    BASE_URL = "https://api.companieshouse.gov.uk"
    MAX_RESULTS = 1000
    ITEMS_PER_PAGE = 100
    
    # Rate Limiting
    MAX_REQUESTS = 590
    TIME_WINDOW = 300  # seconds
    
    # SIC Codes for Insurance Companies
    INSURANCE_SIC_CODES = ['651', '652']
    
    # Search Terms
    SEARCH_TERMS = [
        "insurance company",
        "insurance limited",
        "insurance ltd",
        "insurance plc",
        "assurance company",
        "assurance limited",
        "assurance ltd",
        "assurance plc"
    ]
    
    # Output Directory
    OUTPUT_DIR = Path("insurance_companies_data")

## PLAY A SOUND UNTIL THE INPUT IS DONE

In [28]:
def play_sound_until_input():

    import threading
    import time
    from pydub import AudioSegment
    from pydub.playback import play

    # Define a function to play the sound in a loop
    def play_sound_loop():
        sound = AudioSegment.from_wav('/home/scubamut1/Downloads/ring2.wav')
        while not stop_event.is_set():
            play(sound)
            # Add a small delay to prevent overwhelming the CPU
            time.sleep(1)

    # Function to wait for user input
    def get_user_input(prompt):
        input(prompt)
        stop_event.set()

    # Create an Event object to signal when to stop playing the sound
    stop_event = threading.Event()

    # Start the thread to play sound
    sound_thread = threading.Thread(target=play_sound_loop)
    sound_thread.start()

    # Wait for user input
    # get_user_input("Press Enter to stop the sound: ")
    user_input = input("Please enter your input: ")

    # Wait for the sound thread to finish
    sound_thread.join()

    return user_input

user_input = play_sound_until_input()
print(user_input)



KeyboardInterrupt: 

In [None]:
def play_sound_until_input():

    import threading
    import time
    from pydub import AudioSegment
    from pydub.playback import play

    # Define a function to play the sound in a loop
    def play_sound_loop():
        sound = AudioSegment.from_wav('/home/scubamut1/Downloads/ring2.wav')
        while not stop_event.is_set():
            play(sound)
            # Add a small delay to prevent overwhelming the CPU
            time.sleep(1)

    # Function to wait for user input
    def get_user_input(prompt):
        user_input = input(prompt)
        stop_event.set()
        return user_input

    # Create an Event object to signal when to stop playing the sound
    stop_event = threading.Event()

    # Start the thread to play sound
    sound_thread = threading.Thread(target=play_sound_loop)
    sound_thread.start()

    # Wait for user input
    user_input = get_user_input("Please enter inout until sound stops: ")

    # Wait for the sound thread to finish
    sound_thread.join()

    return user_input

user_input = play_sound_until_input()
print('user_input is : ', user_input)

user_input is :  ssss


: 

In [26]:
# Example program to demonstrate waiting for user input and printing it
user_input = input("Please enter your name: ")
print("Hello, " + user_input + "!")

Hello, d!
