## Capitol Trades

Scrape trading data from capitoltrades.com

### Import Libraries

In [None]:
import time
import requests
import pandas as pd
from bs4 import BeautifulSoup
from dataclasses import dataclass
from typing import List, Optional, Dict
import re
from urllib.parse import urljoin, urlparse, parse_qs

### Data Class

In [None]:
@dataclass
class TradeRecord:
    """Data class representing a single trade record"""
    politician: str
    traded_issuer: str
    published: str
    traded: str
    filed_after: str
    owner: str
    type: str
    size: str
    price: str

### Capitol Trades Scraper

In [None]:
class CapitolTradesScraper:
    """Web scraper for Capitol Trades website using BeautifulSoup"""
    
    def __init__(self, delay: float = 2.0, scraper_api_key: str = None):
        """
        Initialise the scraper with requests session
        
        Args:
            delay: Delay between requests in seconds
            scraper_api_key: ScraperAPI key for proxy rotation (optional)
        """
        self.delay = delay
        self.base_url = "https://www.capitoltrades.com/trades"
        self.scraper_api_key = scraper_api_key
        self.scraper_api_base = "https://api.scraperapi.com"
        self.session = self._setup_session()
        self.current_page = 1
    
    def _setup_session(self) -> requests.Session:
        """Configure and return requests session with proper headers"""
        session = requests.Session()
        
        if not self.scraper_api_key:
            session.headers.update({
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
                'Accept-Language': 'en-GB,en;q=0.5',
                'Accept-Encoding': 'gzip, deflate',
                'Connection': 'keep-alive',
                'Upgrade-Insecure-Requests': '1'
            })
        else:
            print(f"Using ScraperAPI with key ending in ...{self.scraper_api_key[-4:]}")
            
        return session
    
    def _build_scraper_api_url(self, target_url: str) -> str:
        """Build ScraperAPI URL with parameters"""
        if not self.scraper_api_key:
            return target_url
            
        params = {
            'api_key': self.scraper_api_key,
            'url': target_url,
            'render': 'false',
            'country_code': 'gb'
        }
        
        param_string = '&'.join([f"{k}={requests.utils.quote(str(v))}" for k, v in params.items()])
        return f"{self.scraper_api_base}?{param_string}"
    
    def fetch_page(self, url: str = None) -> Optional[BeautifulSoup]:
        """Fetch and parse a webpage"""
        target_url = url or self.base_url
        request_url = self._build_scraper_api_url(target_url)
        
        try:
            if self.scraper_api_key:
                print(f"Fetching via ScraperAPI: {target_url}")
            else:
                print(f"Fetching directly: {target_url}")
                
            response = self.session.get(request_url, timeout=60)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.content, 'html.parser')
            print(f"Successfully fetched page {self.current_page}")
            return soup
            
        except requests.exceptions.RequestException as e:
            print(f"Error fetching page: {e}")
            return None
    
    def scrape_page_data(self, soup: BeautifulSoup) -> List[TradeRecord]:
        """Extract trade data from a parsed webpage"""
        trades = []
        
        tbody = soup.find('tbody')
        if not tbody:
            print("No table body found")
            return trades
        
        rows = tbody.find_all('tr')
        if not rows:
            print("No rows found in tbody")
            return trades
            
        print(f"Processing {len(rows)} rows...")
        
        for row_index, row in enumerate(rows):
            cells = row.find_all('td')
            if len(cells) >= 8:  # Must have at least 8 data columns
                try:
                    trade = self._extract_trade_from_row(row, cells, row_index)
                    if trade:
                        trades.append(trade)
                        if row_index < 3:  # Show progress for first few trades
                            print(f"✓ Extracted: {trade.politician} - {trade.traded_issuer}")
                except Exception as e:
                    print(f"Error extracting trade from row {row_index}: {e}")
                    continue
        
        print(f"Successfully extracted {len(trades)} trades from page {self.current_page}")
        return trades
    
    def _extract_trade_from_row(self, row, cells, row_index: int) -> Optional[TradeRecord]:
        """Extract trade data from a table row"""
        # Extract politician name
        politician_cell = cells[0]
        politician_link = politician_cell.find('a', class_='text-txt-interactive')
        politician = politician_link.get_text(strip=True) if politician_link else self._fallback_text_extraction(politician_cell, "politician")
        
        # Extract issuer name
        issuer_cell = cells[1]
        issuer_link = issuer_cell.find('a', class_='hover:no-underline')
        if not issuer_link:
            issuer_link = issuer_cell.find('a')  # Fallback to any link
        issuer = issuer_link.get_text(strip=True) if issuer_link else self._fallback_text_extraction(issuer_cell, "issuer")
        
        # Extract published date
        published = self._extract_date_from_cell(cells[2])
        
        # Extract traded date
        traded = self._extract_date_from_cell(cells[3])
        
        # Extract filed after days
        filed_after_span = cells[4].find('span', class_=re.compile('reporting-gap'))
        if not filed_after_span:
            # Look for any number in the cell
            filed_after_text = cells[4].get_text(strip=True)
            numbers = re.findall(r'\d+', filed_after_text)
            filed_after = numbers[0] if numbers else filed_after_text
        else:
            filed_after = filed_after_span.get_text(strip=True)
        
        # Extract owner
        owner_span = cells[5].find('span', class_='q-label')
        owner = owner_span.get_text(strip=True) if owner_span else self._fallback_text_extraction(cells[5], "owner")
        
        # Extract transaction type
        type_span = cells[6].find('span', class_=re.compile('tx-type'))
        trade_type = type_span.get_text(strip=True) if type_span else self._fallback_text_extraction(cells[6], "type")
        
        # Extract size
        size_span = cells[7].find('span', class_='mt-1')
        if not size_span:
            size_span = cells[7].find('span')  # Fallback to any span
        size = size_span.get_text(strip=True) if size_span else self._fallback_text_extraction(cells[7], "size")
        
        # Extract price (9th column if exists)
        if len(cells) > 8:
            price_span = cells[8].find('span')
            price = price_span.get_text(strip=True) if price_span else self._fallback_text_extraction(cells[8], "price")
        else:
            price = "N/A"
        
        # Validate required fields
        if politician == "Unknown" or issuer == "Unknown":
            return None
        
        return TradeRecord(
            politician=politician,
            traded_issuer=issuer,
            published=published,
            traded=traded,
            filed_after=filed_after,
            owner=owner,
            type=trade_type,
            size=size,
            price=price
        )
    
    def _extract_date_from_cell(self, cell) -> str:
        """Extract date from a cell with date structure"""
        date_div = cell.find('div', class_='text-center')
        if date_div:
            day_month_div = date_div.find('div', class_='text-size-3')
            year_div = date_div.find('div', class_='text-size-2')
            
            if day_month_div and year_div:
                day_month = day_month_div.get_text(strip=True)
                year = year_div.get_text(strip=True)
                return f"{day_month} {year}"
        
        # Fallback to cell text
        return self._fallback_text_extraction(cell, "date")
    
    def _fallback_text_extraction(self, cell, field_name: str) -> str:
        """Fallback method to extract text from cell"""
        text = cell.get_text(strip=True)
        if not text:
            return "Unknown"
        
        # Clean up the text (remove extra whitespace)
        text = re.sub(r'\s+', ' ', text)
        
        # For politician names, try to extract just the name part
        if field_name == "politician":
            # Look for pattern: "Name Party Chamber State"
            parts = text.split()
            if len(parts) >= 2:
                # Try to find where the name ends (before political info)
                political_terms = ['Republican', 'Democrat', 'House', 'Senate']
                name_parts = []
                for part in parts:
                    if part in political_terms:
                        break
                    name_parts.append(part)
                
                if name_parts:
                    return ' '.join(name_parts)
        
        # Limit length for readability
        return text[:100] if len(text) > 100 else text
    
    def find_next_page_url(self, soup: BeautifulSoup) -> Optional[str]:
        """Find the URL for the next page"""
        # Look for pagination area
        pagination_area = soup.find('div', class_=re.compile(r'relative.*flex.*items-center'))
        
        if pagination_area:
            # Look for "Go to next page" link
            next_link = pagination_area.find('a', {'aria-label': re.compile(r'Go to next page', re.IGNORECASE)})
            if next_link:
                href = next_link.get('href')
                if href:
                    return urljoin(self.base_url, href)
        
        # Fallback: Look for any page links
        all_page_links = soup.find_all('a', href=re.compile(r'page=\d+'))
        for link in all_page_links:
            href = link.get('href')
            page_match = re.search(r'page=(\d+)', href)
            if page_match and int(page_match.group(1)) == self.current_page + 1:
                return urljoin(self.base_url, href)
        
        return None
    
    def scrape_all_pages(self, max_pages: int = None) -> List[TradeRecord]:
        """Scrape trade data from all available pages"""
        all_trades = []
        current_url = self.base_url
        
        while current_url and (max_pages is None or self.current_page <= max_pages):
            print(f"\nScraping page {self.current_page}")
            
            soup = self.fetch_page(current_url)
            if not soup:
                print(f"Failed to fetch page {self.current_page}")
                break
            
            page_trades = self.scrape_page_data(soup)
            
            if not page_trades and self.current_page > 1:
                print("No trades found - might be end of data")
                break
                
            all_trades.extend(page_trades)
            
            if max_pages and self.current_page >= max_pages:
                print(f"Reached maximum pages limit: {max_pages}")
                break
            
            next_url = self.find_next_page_url(soup)
            if not next_url:
                print("No more pages found")
                break
            
            if next_url == current_url:
                print("Next URL same as current - stopping to avoid loop")
                break
            
            current_url = next_url
            self.current_page += 1
            
            print(f"Waiting {self.delay} seconds...")
            time.sleep(self.delay)
        
        print(f"\n🎉 Scraping complete! Total trades: {len(all_trades)} from {self.current_page} page(s)")
        return all_trades
    
    def save_to_csv(self, trades: List[TradeRecord], filename: str = "capitol_trades.csv") -> None:
        """Save trades to CSV file with auto-generated ID column"""
        if not trades:
            print("No trades to save")
            return
        
        df = pd.DataFrame([trade.__dict__ for trade in trades])
        df.insert(0, 'id', range(1, len(df) + 1))
        
        df.to_csv(filename, index=False)
        print(f"💾 Saved {len(trades)} trades to {filename}")
    
    def get_trade_summary(self, trades: List[TradeRecord]) -> Dict:
        """Generate summary statistics for scraped trades"""
        if not trades:
            return {}
        
        df = pd.DataFrame([trade.__dict__ for trade in trades])
        
        return {
            'total_trades': len(trades),
            'unique_politicians': df['politician'].nunique(),
            'unique_issuers': df['traded_issuer'].nunique(),
            'trade_types': df['type'].value_counts().to_dict(),
            'owner_breakdown': df['owner'].value_counts().to_dict(),
            'top_politicians': df['politician'].value_counts().head(5).to_dict(),
            'top_issuers': df['traded_issuer'].value_counts().head(5).to_dict()
        }
    
    def close(self) -> None:
        """Close the requests session"""
        if self.session:
            self.session.close()
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

### Scrape Capitol Trades

In [None]:
def main():
    """Main execution function"""
    print("🚀 Starting Capitol Trades scraper...")
    
    # Configuration
    MAX_PAGES = 5  # Adjust as needed
    DELAY_SECONDS = 3.0  # Be respectful to the server
    
    # Option 1: Use ScraperAPI (recommended for production)
    # scraper_api_key = "YOUR_SCRAPERAPI_KEY_HERE"
    # with CapitolTradesScraper(delay=DELAY_SECONDS, scraper_api_key=scraper_api_key) as scraper:
    
    # Option 2: Direct scraping (for testing)
    with CapitolTradesScraper(delay=DELAY_SECONDS) as scraper:
        trades = scraper.scrape_all_pages(max_pages=MAX_PAGES)
        
        if trades:
            # Save to CSV
            scraper.save_to_csv(trades, "capitol_trades.csv")
            
            # Display summary
            summary = scraper.get_trade_summary(trades)
            print(f"\n📊 SCRAPING SUMMARY:")
            print(f"📈 Total trades: {summary.get('total_trades', 0)}")
            print(f"👥 Unique politicians: {summary.get('unique_politicians', 0)}")
            print(f"🏢 Unique issuers: {summary.get('unique_issuers', 0)}")
            
            trade_types = summary.get('trade_types', {})
            if trade_types:
                print(f"\n💹 Trade Types:")
                for trade_type, count in trade_types.items():
                    print(f"   {trade_type}: {count}")
            
            owner_breakdown = summary.get('owner_breakdown', {})
            if owner_breakdown:
                print(f"\n👤 Owner Breakdown:")
                for owner, count in owner_breakdown.items():
                    print(f"   {owner}: {count}")
            
            top_politicians = summary.get('top_politicians', {})
            if top_politicians:
                print(f"\n🏛️ Top Politicians by Trade Count:")
                for politician, count in list(top_politicians.items())[:5]:
                    print(f"   {politician}: {count} trades")
            
            top_issuers = summary.get('top_issuers', {})
            if top_issuers:
                print(f"\n🏢 Top Issuers by Trade Count:")
                for issuer, count in list(top_issuers.items())[:5]:
                    print(f"   {issuer}: {count} trades")
            
            # Show sample trades
            print(f"\n📋 Sample Trades:")
            for i, trade in enumerate(trades[:3], 1):
                print(f"\nTrade {i}:")
                print(f"   Politician: {trade.politician}")
                print(f"   Issuer: {trade.traded_issuer}")
                print(f"   Type: {trade.type}")
                print(f"   Size: {trade.size}")
                print(f"   Price: {trade.price}")
                print(f"   Published: {trade.published}")
                print(f"   Traded: {trade.traded}")
        else:
            print("❌ No trades were scraped. Check your internet connection or the website may have changed.")

    print("\n✅ Scraper finished!")


if __name__ == "__main__":
    main()