In [5]:
# Define the specific crawls we want to use
TARGET_CRAWLS = [
    # These IDs will be updated based on what's available when running the code
    # Format: {"name": "descriptive_name", "id": "CC-MAIN-YYYY-WW"}
    {"name": "February 2025", "id": "CC-MAIN-2025-08"},  # February 2025 (or closest available)
    {"name": "August 2024", "id": "CC-MAIN-2024-33"},    # August 2024 (or closest available)
    {"name": "Fall 2023", "id": "CC-MAIN-2023-40"},      # September/October 2023 (or closest available)
    {"name": "February 2022", "id": "CC-MAIN-2022-05"}   # February 2022 (or closest available)
]

In [None]:
!pip install warc

In [7]:
import requests
import gzip
import io
import re

def stream_warc_paths(url, num_lines=50):
    """
    Stream a gzipped file of WARC paths from Common Crawl, decompress it on the fly,
    and return the first n paths.

    Args:
        url: URL to the gzipped paths file
        num_lines: Number of paths to return (default: 50)

    Returns:
        List of the first num_lines WARC file paths
    """
    # Stream the response without downloading the whole file
    response = requests.get(url, stream=True)

    if response.status_code != 200:
        raise Exception(f"Failed to download file: {response.status_code}")

    # Create a gzip decompressor that can process chunks of data
    decompressor = gzip.GzipFile(fileobj=io.BytesIO(response.content), mode='rb')

    # Read and decode the first num_lines lines
    lines = []
    for i, line in enumerate(decompressor):
        if i >= num_lines:
            break
        lines.append(line.decode('utf-8').strip())

    return lines

def stream_warc_content(warc_path, base_url="https://data.commoncrawl.org/", max_records=5):
    """
    Stream a specific WARC file from Common Crawl, decompress it on the fly,
    and extract the first few web page contents.

    Args:
        warc_path: Path to WARC file (from warc.paths.gz)
        base_url: Base URL for Common Crawl data
        max_records: Maximum number of WARC records to process

    Returns:
        List of dictionaries with WARC record information
    """
    full_url = base_url + warc_path
    print(f"Retrieving WARC file from: {full_url}")

    # Stream the WARC.gz file
    response = requests.get(full_url, stream=True)

    if response.status_code != 200:
        raise Exception(f"Failed to download WARC file: {response.status_code}")

    # Process the compressed data in chunks to avoid memory issues
    # We'll use a small buffer size to demonstrate streaming
    buffer_size = 1024 * 1024  # 1MB chunks

    records = []
    record_count = 0

    # Simple WARC parsing
    in_header = False
    current_headers = {}
    current_content = []
    in_record = False  # Flag to track if we're inside a record

    # Create a stream reader for the gzipped content
    stream = io.BytesIO(response.content)
    decompressor = gzip.GzipFile(fileobj=stream, mode='rb')

    # Buffer for incomplete lines
    buffer = b""

    # Process decompressed data line by line
    while True:
        # Read a chunk of data
        chunk = decompressor.read(buffer_size)
        if not chunk:
            break

        # Combine with any leftover buffer from previous chunk
        data = buffer + chunk

        # Split into lines, keeping the last potentially incomplete line in the buffer
        lines = data.split(b'\r\n')
        buffer = lines.pop() if lines else b""

        # Process each line
        for line in lines:
            line_str = line.decode('utf-8', errors='ignore')

            # WARC record separator - start of a new record
            if line_str.startswith('WARC/1.0'):
                # Save previous record if it exists and we haven't reached max_records
                if current_headers and record_count < max_records and in_record:
                    records.append({
                        'headers': current_headers,
                        'content': ''.join(current_content)
                    })
                    record_count += 1

                # Start a new record
                current_headers = {}
                current_content = []
                in_header = True
                in_record = True
                continue

            # Empty line marks the end of headers
            if in_header and not line_str:
                in_header = False
                continue

            # Process header lines
            if in_header:
                if ': ' in line_str:
                    key, value = line_str.split(': ', 1)
                    current_headers[key] = value
            else:
                # Add to content if we're in a record's body
                if current_headers:
                    current_content.append(line_str + '\n')

            # Check if we've reached max_records
            if record_count >= max_records:
                break

        # Break out of the outer loop if we've reached max_records
        if record_count >= max_records:
            break

    # Add the last record if we haven't reached max_records
    if current_headers and record_count < max_records:
        records.append({
            'headers': current_headers,
            'content': ''.join(current_content)
        })

    return records

def extract_html_content(warc_record):
    """
    Extract HTML content from a WARC response record.

    Args:
        warc_record: Dictionary containing WARC record data

    Returns:
        HTML content or empty string if no HTML content is found
    """
    content = warc_record['content']

    # Check if it's a response record
    if warc_record['headers'].get('WARC-Type') != 'response':
        return ""

    # In a WARC response record, the structure is:
    # 1. WARC headers (already separated)
    # 2. HTTP headers
    # 3. Blank line (\r\n\r\n)
    # 4. HTTP body (HTML content)

    # First, look for the double newline that separates HTTP headers from body
    patterns = ['\r\n\r\n', '\n\n']
    for pattern in patterns:
        # We might need to find the second occurrence if there are multiple header blocks
        parts = content.split(pattern, 1)
        if len(parts) > 1:
            # If we found HTTP headers, return everything after them
            return parts[1]

    # If we can't clearly separate headers, look for HTML start
    html_indicators = ['<!DOCTYPE', '<html', '<HTML', '<?xml']
    for indicator in html_indicators:
        pos = content.find(indicator)
        if pos != -1:
            return content[pos:]

    return content

In [None]:
urls = []
for crawl in TARGET_CRAWLS:
    urls.append(f"https://data.commoncrawl.org/crawl-data/{crawl['id']}/warc.paths.gz")
print(urls)

In [None]:
num_lines = 4
limit = 1200
crawl_data = {}

try:
    # Get first 5 WARC paths


    for warc_paths_url in urls:
        sites = []
        warc_paths = stream_warc_paths(warc_paths_url, num_lines=num_lines)
        for i, path in enumerate(warc_paths):
            print(f"{i+1}: {path}")

        # Now, get the content from the first WARC file
        if warc_paths:
            for i in range(num_lines):
                # Only process first 2 records to keep the output manageable
                warc_records = stream_warc_content(warc_paths[i], max_records=limit)
                # print(f"\nExtracted {len(warc_records)} records from first WARC file:")
                for i, record in enumerate(warc_records):
                    # print(f"\nRecord {i+1}:")
                    # print(f"WARC Type: {record['headers'].get('WARC-Type', 'unknown')}")
                    # print(f"Target URI: {record['headers'].get('WARC-Target-URI', 'unknown')}")

                    # For response records, try to extract the HTML content
                    if record['headers'].get('WARC-Type') == 'response':
                        html_content = extract_html_content(record)
                        # Print a sample of the content (first 500 chars)
                        content_preview = html_content[:500] + "..." if len(html_content) > 500 else html_content
                        if len(sites) < limit: sites.append(html_content)
                        # print(f"HTML Content Preview:\n{content_preview}")
                    else:
                        # Print a sample of the content (first 500 chars)
                        content_preview = record['content'][:500] + "..." if len(record['content']) > 500 else record['content']
                        # print(f"Content Preview:\n{content_preview}")

                    # print(f"num sites: {len(sites)}")
                    if len(sites) == limit: break
                if len(sites) == limit: break
            crawl_data[warc_paths_url] = sites
            print(f"=== {warc_paths_url} scraping complete. {len(sites)} sites catalogued")
        else:
            print("No WARC paths found.")

except Exception as e:
    print(f"Error: {e}")

In [None]:
from bs4 import BeautifulSoup

def extract_text_from_html(html_content):
    """
    Extract plain text from HTML content, removing all HTML tags.

    Args:
        html_content: String containing HTML content

    Returns:
        Plain text extracted from the HTML
    """
    # Parse HTML with Beautiful Soup
    soup = BeautifulSoup(html_content, 'html.parser')

    # Remove script and style elements - they contain text we don't want
    for script_or_style in soup(["script", "style"]):
        script_or_style.decompose()

    # Get text
    text = soup.get_text()

    # Break into lines and remove leading/trailing whitespace
    lines = (line.strip() for line in text.splitlines())

    # Break multi-headlines into a single line
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))

    # Drop blank lines
    text = '\n'.join(chunk for chunk in chunks if chunk)

    return text

# For use in a Colab notebook:
# 1. Install BeautifulSoup if you haven't already
# !pip install beautifulsoup4

# 2. Use it with your HTML content
for i, url in enumerate(crawl_data):
    i = 1
    print(url)
    for site in crawl_data[url]:
        print(f"\n~~~~ SITE {i} ~~~~")
        html_content = site
        text = extract_text_from_html(html_content)
        i += 1



In [31]:
from bs4 import BeautifulSoup
import json
import os
import re

def extract_text_from_html(html_content):
    """
    Extract plain text from HTML content, removing all HTML tags.

    Args:
        html_content: String containing HTML content

    Returns:
        Plain text extracted from the HTML
    """
    try:
        # Parse HTML with Beautiful Soup
        soup = BeautifulSoup(html_content, 'html.parser')

        # Remove script and style elements - they contain text we don't want
        for script_or_style in soup(["script", "style"]):
            script_or_style.decompose()

        # Get text
        text = soup.get_text()

        # Break into lines and remove leading/trailing whitespace
        lines = (line.strip() for line in text.splitlines())

        # Break multi-headlines into a single line
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))

        # Drop blank lines
        text = '\n'.join(chunk for chunk in chunks if chunk)

        return text
    except Exception as e:
        print(f"Error extracting text: {e}")
        return ""

def sanitize_filename(url):
    """
    Convert a URL into a safe filename.

    Args:
        url: URL string

    Returns:
        Safe filename string
    """
    print(url)
    len_pre = len("https://data.commoncrawl.org/crawl-data/")
    len_post = len("/warc.paths.gz")
    filename = url[len_pre: len(url) - len_post]

    # Remove protocol and replace special characters
    filename = re.sub(r'^https?://', '', filename)
    # Replace any character that's not alphanumeric, underscore, or dash with underscore
    filename = re.sub(r'[^\w\-]', '_', filename)
    # Truncate if too long
    if len(filename) > 100:
        filename = filename[:100]
    return filename + '.json'

def process_crawl_data(crawl_data, output_dir='crawl_json'):
    """
    Process the crawl data and save each crawl to a separate JSON file.

    Args:
        crawl_data: Dictionary with URLs as keys and lists of HTML content as values
        output_dir: Directory to save the JSON files
    """
    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Process each crawl URL
    for url, sites in crawl_data.items():
        # Create a JSON object for this crawl
        crawl_json = {
            "crawl_url": url,
            "sites": []
        }

        print(f"Processing crawl: {url}")

        # Process each site in this crawl
        for i, html_content in enumerate(sites):
            print(f"  Processing site {i+1}/{len(sites)}")

            # Extract text from HTML
            text = extract_text_from_html(html_content)

            # Add site data to the JSON object
            site_data = {
                "site_id": i+1,
                "html_length": len(html_content),
                "text": text
            }

            crawl_json["sites"].append(site_data)

        # Create filename from URL
        filename = sanitize_filename(url)
        filepath = os.path.join(output_dir, filename)

        # Write JSON to file
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(crawl_json, f, ensure_ascii=False, indent=2)

        print(f"Saved crawl data to {filepath}")

In [None]:
# Create a dictionary to organize sites by crawl URL
organized_data = {}
for i, url in enumerate(crawl_data):
    # Initialize the list for this URL if it doesn't exist
    if url not in organized_data:
        organized_data[url] = []

    # Add all sites for this URL
    for site in crawl_data[url]:
        organized_data[url].append(site)

# Process the organized data
process_crawl_data(organized_data)