<a href="https://colab.research.google.com/github/samanthajmichael/baymax_tc/blob/main/terms/data_ingestion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Testing the Custom Search

In [10]:
%%capture
!pip install requests beautifulsoup4 pandas pandas-datareader google-api-python-client

In [2]:
from google.colab import userdata
GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
SEARCH_ENGINE_ID = userdata.get('SEARCH_ENGINE_ID')

In [3]:
base_url = "https://www.googleapis.com/customsearch/v1"
query = "S&P 500 companies"  # Replace with your desired search term
url = f"{base_url}?key={GOOGLE_API_KEY}&cx={SEARCH_ENGINE_ID}&q={query}"

In [4]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import pandas_datareader as pdr
import os
from urllib.parse import urlparse

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

In [None]:
# Testing the custom search function
"""def test_custom_search():
    try:
        print(f"API Key: {GOOGLE_API_KEY[:5]}...{GOOGLE_API_KEY[-5:]}")  # Print first and last 5 characters
        print(f"Search Engine ID: {SEARCH_ENGINE_ID}")

        service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY)

        res = service.cse().list(q="Google", cx=SEARCH_ENGINE_ID, num=1).execute()

        if 'items' in res:
            print(f"Search successful. First result: {res['items'][0]['link']}")
        else:
            print("Search successful, but no results found.")

    except HttpError as e:
        print(f"An error occurred: {e}")
        if hasattr(e, 'error_details'):
            print(f"Error details: {e.error_details}")

if __name__ == "__main__":
    test_custom_search()"""

## Running the Custom Search Function

In [11]:
from google.colab import userdata
GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
SEARCH_ENGINE_ID = userdata.get('SEARCH_ENGINE_ID')

In [None]:
# Import required libraries
import requests
from bs4 import BeautifulSoup
import pandas as pd
import os
from urllib.parse import urlparse, urljoin
import time
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.colab import userdata
import logging
from pathlib import Path
from typing import List, Optional

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def validate_credentials() -> bool:
    """Validate that required API credentials are present"""
    if not GOOGLE_API_KEY or not SEARCH_ENGINE_ID:
        logger.error("Missing required API credentials")
        return False
    return True

def get_sp500_companies() -> List[str]:
    """Fetch list of S&P 500 companies from Wikipedia"""
    try:
        table = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')
        df = table[0]
        return df['Security'].tolist()
    except Exception as e:
        logger.error(f"Error fetching S&P 500 companies: {e}")
        return []

def search_company_website(company_name: str) -> Optional[str]:
    """Search for company's official website using Google Custom Search API"""
    try:
        service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY)
        res = service.cse().list(
            q=f"{company_name} official website",
            cx=SEARCH_ENGINE_ID,
            num=1
        ).execute()

        if 'items' in res:
            url = res['items'][0]['link']
            parsed_url = urlparse(url)
            base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
            return base_url

        logger.warning(f"No results found for {company_name}")
        return None

    except HttpError as e:
        logger.error(f"Google API error for {company_name}: {e}")
        return None

def find_terms_link(url: str) -> Optional[str]:
    """Find terms and conditions link on website"""
    try:
        response = requests.get(url, timeout=10, headers={
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        response.raise_for_status()

        soup = BeautifulSoup(response.text, 'html.parser')
        potential_links = soup.find_all('a', text=lambda text: text and (
            'terms' in text.lower() or
            'conditions' in text.lower() or
            'legal' in text.lower()
        ))

        if potential_links:
            return urljoin(url, potential_links[0]['href'])
        return None

    except requests.RequestException as e:
        logger.error(f"Error finding terms link for {url}: {e}")
        return None

def download_terms(url: str, company_name: str) -> bool:
    """Download and save terms and conditions"""
    try:
        response = requests.get(
            url,
            timeout=10,
            headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
        )
        response.raise_for_status()

        content_type = response.headers.get('Content-Type', '').lower()
        if 'text' not in content_type:
            logger.warning(f"Skipping non-text content for {company_name}: {content_type}")
            return False

        # Create terms directory using pathlib
        terms_dir = Path('terms')
        terms_dir.mkdir(exist_ok=True)

        # Sanitize filename
        safe_company_name = "".join(c for c in company_name if c.isalnum() or c in (' ', '-', '_')).strip()
        filename = terms_dir / f"{safe_company_name}_terms.txt"

        # Save the content
        filename.write_text(response.text, encoding='utf-8')
        logger.info(f"Saved terms for {company_name} to {filename}")
        return True

    except requests.RequestException as e:
        logger.error(f"Error downloading terms for {company_name}: {e}")
        return False

def main():
    """Main execution function"""
    # Validate credentials first
    if not validate_credentials():
        return

    # Get S&P 500 companies
    sp_500_companies = get_sp500_companies()
    if not sp_500_companies:
        logger.error("Failed to fetch company list")
        return

    # Process each company
    for i, company in enumerate(sp_500_companies[44:], 44):
        logger.info(f"Processing {company} (#{i})...")

        try:
            # Search for company website
            website_url = search_company_website(company)
            if not website_url:
                continue

            logger.info(f"Found website URL for {company}: {website_url}")

            # Find terms and conditions
            terms_url = find_terms_link(website_url)
            if not terms_url:
                continue

            logger.info(f"Found terms and conditions URL for {company}: {terms_url}")

            # Download terms
            download_terms(terms_url, company)

        except Exception as e:
            logger.error(f"Unexpected error processing {company}: {e}")
            continue

        finally:
            # Always respect rate limits
            time.sleep(1)

if __name__ == "__main__":
    main()

In [14]:
import shutil

# Zip the 'terms' folder into a file called 'terms.zip'
shutil.make_archive('terms', 'zip', 'terms')

'/content/terms.zip'

In [None]:
from google.colab import userdata
import logging
from pathlib import Path
import json
from typing import Optional, Dict
import time
from datetime import datetime
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('scraper.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class CredentialManager:
    """Manages API credentials from Google Colab userdata"""

    @staticmethod
    def get_credentials() -> Dict[str, str]:
        """Retrieve and validate API credentials"""
        try:
            credentials = {
                'google_api_key': userdata.get('GOOGLE_API_KEY'),
                'search_engine_id': userdata.get('SEARCH_ENGINE_ID')
            }

            # Validate credentials
            if not all(credentials.values()):
                missing = [k for k, v in credentials.items() if not v]
                raise ValueError(f"Missing required credentials: {', '.join(missing)}")

            return credentials

        except Exception as e:
            logger.error(f"Error retrieving credentials: {e}")
            raise

class ScrapingProgress:
    def __init__(self, progress_file: str = "scraping_progress.json"):
        self.progress_file = Path(progress_file)
        self.progress_data = self._load_progress()

    def _load_progress(self) -> Dict:
        """Load progress data from file or create new if doesn't exist"""
        if self.progress_file.exists():
            try:
                with open(self.progress_file, 'r') as f:
                    return json.load(f)
            except json.JSONDecodeError:
                logger.error("Corrupted progress file, creating new one")
                return self._create_new_progress()
        return self._create_new_progress()

    def _create_new_progress(self) -> Dict:
        """Create new progress tracking structure"""
        return {
            'last_index': None,
            'last_company': None,
            'timestamp': None,
            'completed_companies': [],
            'failed_companies': []
        }

    def save_progress(self, index: int, company: str, success: bool):
        """Save current progress to file"""
        self.progress_data['last_index'] = index
        self.progress_data['last_company'] = company
        self.progress_data['timestamp'] = datetime.now().isoformat()

        if success:
            if company not in self.progress_data['completed_companies']:
                self.progress_data['completed_companies'].append(company)
        else:
            if company not in self.progress_data['failed_companies']:
                self.progress_data['failed_companies'].append(company)

        with open(self.progress_file, 'w') as f:
            json.dump(self.progress_data, f, indent=2)

    def get_last_index(self) -> Optional[int]:
        """Get the last processed index"""
        return self.progress_data['last_index']

def search_company_website(company_name: str, credentials: Dict[str, str]) -> Optional[str]:
    """Search for company website using Google Custom Search API"""
    try:
        service = build("customsearch", "v1", developerKey=credentials['google_api_key'])
        res = service.cse().list(
            q=f"{company_name} official website",
            cx=credentials['search_engine_id'],
            num=1
        ).execute()

        if 'items' in res:
            url = res['items'][0]['link']
            parsed_url = urlparse(url)
            return f"{parsed_url.scheme}://{parsed_url.netloc}"
        logger.warning(f"No results found for {company_name}")
        return None
    except HttpError as e:
        logger.error(f"Google API error for {company_name}: {e}")
        return None

def main(start_company: Optional[str] = None, resume: bool = True):
    """
    Main execution function with resume capability

    Args:
        start_company: Specific company to start from (optional)
        resume: Whether to resume from last known position
    """
    # Initialize progress tracking and get credentials
    progress = ScrapingProgress()

    try:
        # Get and validate credentials
        credentials = CredentialManager.get_credentials()

        # Get S&P 500 companies
        sp_500_companies = get_sp500_companies()
        if not sp_500_companies:
            logger.error("Failed to fetch S&P 500 companies list")
            return

        # Determine starting point
        start_index = 0
        if start_company:
            try:
                start_index = sp_500_companies.index(start_company)
                logger.info(f"Starting from specified company: {start_company} (index: {start_index})")
                sp_500_companies = sp_500_companies[start_index:]
            except ValueError:
                logger.error(f"{start_company} not found in the list of S&P 500 companies.")
                return
        elif resume:
            last_index = progress.get_last_index()
            if last_index is not None:
                start_index = last_index + 1
                logger.info(f"Resuming from index: {start_index}")
                sp_500_companies = sp_500_companies[start_index:]

        # Create output directory if it doesn't exist
        Path('terms').mkdir(exist_ok=True)

        # Process companies
        for index, company in enumerate(sp_500_companies, start=start_index):
            try:
                logger.info(f"Processing {company} (Index: {index})...")

                # Check if company was already processed
                if company in progress.progress_data['completed_companies']:
                    logger.info(f"Skipping {company} - already processed")
                    continue

                # Search for company website
                website_url = search_company_website(company, credentials)
                if not website_url:
                    logger.warning(f"No website found for {company}")
                    progress.save_progress(index, company, False)
                    continue

                logger.info(f"Found website URL for {company}: {website_url}")

                # Find terms and conditions
                terms_url = find_terms_link(website_url)
                if not terms_url:
                    logger.warning(f"No terms and conditions link found for {company}")
                    progress.save_progress(index, company, False)
                    continue

                logger.info(f"Found terms and conditions URL for {company}: {terms_url}")

                # Download and sanitize terms
                download_terms(terms_url, company)
                progress.save_progress(index, company, True)
                logger.info(f"Successfully processed {company} (Index: {index})")

            except Exception as e:
                logger.error(f"Error processing {company}: {e}", exc_info=True)
                progress.save_progress(index, company, False)
                continue

            finally:
                # Always respect rate limits
                time.sleep(1)

        # Log final statistics
        logger.info("Scraping completed!")
        logger.info(f"Successfully processed: {len(progress.progress_data['completed_companies'])} companies")
        logger.info(f"Failed to process: {len(progress.progress_data['failed_companies'])} companies")

    except Exception as e:
        logger.error(f"Critical error in main execution: {e}", exc_info=True)
        raise

if __name__ == "__main__":
    # Example usage
    main(start_company="eBay", resume=True)

In [None]:
import shutil

# Zip the 'terms' folder into a file called 'terms.zip'
shutil.make_archive('terms', 'zip', 'terms')