In [1]:
import requests
from bs4 import BeautifulSoup
from nltk.tokenize import sent_tokenize
#from transformers import BertTokenizer
import os
from dotenv import load_dotenv
import re
#from happytransformer import HappyTextClassification
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import textwrap
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import textwrap
from nltk.tokenize import sent_tokenize
import re

load_dotenv()

API_KEY = os.getenv("API_KEY")
CUSTOM_CONFIG_ID = os.getenv("CUSTOM_CONFIG_ID")

# FEEL FREE TO CHANGE num_results. It controls how many URLs it searches for.
# If your code is running slow or you are using CPU, maybe reduce number of results.
, try reducing number of results.
def bing_search(query, api_key, custom_config_id, num_results=10, excluded_site=''):
    search_query = f"{query} -site:{excluded_site}" if excluded_site else query
    search_url = f"https://api.bing.microsoft.com/v7.0/custom/search?q={search_query}&customconfig={custom_config_id}"
    headers = {'Ocp-Apim-Subscription-Key': api_key}
    params = {'count': num_results}
    response = requests.get(search_url, headers=headers, params=params)
    if response.status_code != 200:
        print(f"Failed to retrieve search results: {response.status_code}, {response.text}")
        return []
    results = response.json().get('webPages', {}).get('value', [])
    if not results:
        print("No results found.")
        return []
    urls = [result['url'] for result in results]
    print(f"Found URLs: {urls}")
    return urls

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
model_info = {
    "EnvironmentalBERT": ("Vinoth24/environmental_claims", None),
    "SocialBERT": ("ESGBERT/SocialBERT-social", None),
    "GovernanceBERT": ("ESGBERT/GovernanceBERT-governance", None)
}

# Initialize models and tokenizers and move models to GPU
for key, (model_name, _) in model_info.items():
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name).to('cuda')
    model_info[key] = (model, tokenizer)


# If no GPU:
# models = {
#     "EnvironmentalBERT": HappyTextClassification("BERT", "Vinoth24/environmental_claims"),
#     "SocialBERT": HappyTextClassification("BERT", "ESGBERT/SocialBERT-social"),
#     "GovernanceBERT": HappyTextClassification("BERT", "ESGBERT/GovernanceBERT-governance")
# }

In [5]:
def process_and_classify_content(content):
    content = re.sub(r'\s+', ' ', content)  # Normalize whitespace
    sentences = sent_tokenize(content)
    classified_sentences = []

    for sentence in sentences:
        sentence = sentence.strip()
        if sentence:
            # Use text wrapping to manage long lines
            wrapped_text = textwrap.fill(sentence, width=150)
            for line in wrapped_text.split('\n'):
                # Check each model for classification
                for model_name, (model, tokenizer) in model_info.items():
                    inputs = tokenizer(line, return_tensors="pt", padding=True, truncation=True, max_length=512).to('cuda')
                    with torch.no_grad():
                        outputs = model(**inputs)
                        probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
                        score, predicted_class = probs.max(1)
                        if score.item() > 0.6 and predicted_class.item() == 1:
                            clean_line = ' '.join(line.split())
                            classified_sentences.append(f"{model_name}: {clean_line}")
                            break  # Stop checking other models if one has classified the line already

    return '\n'.join(classified_sentences)


# def process_and_classify_content(content):
#     content = re.sub(r'\s+', ' ', content)  # Normalize whitespace
#     sentences = sent_tokenize(content)
#     classified_sentences = []
    
#     for sentence in sentences:
#         sentence = sentence.strip()
#         if sentence:
#             # Use text wrapping to avoid excessively long lines
#             wrapped_text = textwrap.fill(sentence, width=80)
#             for line in wrapped_text.split('\n'):
#                 # Check each model for classification
#                 for model_name, model in models.items():
#                     result = model.classify_text(line)
#                     if result.label == "LABEL_1" and result.score > 0.5:
#                         clean_line = ' '.join(line.split())
#                         classified_sentences.append(f"{model_name}: {clean_line}")
#                         break  # Stop checking other models if one has classified the line already

    # return '\n'.join(classified_sentences)

def fetch_content(url):
    try:
        with requests.Session() as session:
            response = session.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=20)
            if response.status_code == 200:
                soup = BeautifulSoup(response.text, 'html.parser')
                return soup.get_text()
    except requests.Timeout:
        print(f"Timeout occurred for URL: {url}")
    except requests.RequestException as e:
        print(f"Error fetching {url}: {e}")
    return None

def search_and_scrape(query, company_website, max_threads=10):
    urls = bing_search(query, API_KEY, CUSTOM_CONFIG_ID, excluded_site=company_website)
    with open('scraped_data.txt', 'w', encoding='utf-8') as file:
        with ThreadPoolExecutor(max_workers=max_threads) as executor:
            future_to_url = {executor.submit(fetch_content, url): url for url in urls}
            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    content = future.result()
                    if content:
                        processed_content = process_and_classify_content(content)
                        if processed_content:
                            file.write(f"URL: {url}\nContent:\n{processed_content}\n\n")
                except Exception as e:
                    print(f"Error processing {url}: {e}")

In [None]:
company_name = "nike"
company_website = "nike.com"
print(f"Searching: intext:\"{company_name}\" company sustainability")
search_and_scrape(f"intext:\"{company_name}\" company sustainability", company_website)