# V7 Output

In [1]:
import requests
import pandas as pd
import time
from datetime import datetime, timedelta
import json
import csv
import os
import sys
import re
import traceback  # For better error reporting

# Instead of relying on NLTK for sentence tokenization, let's use a simple regex approach
def simple_sentence_tokenize(text):
    """Split text into sentences using simple regex"""
    if not text or not isinstance(text, str):
        return []
    # Split text based on common sentence-ending punctuation
    sentences = re.split(r'(?<=[.!?])\s+', text)
    return [s.strip() for s in sentences if s.strip()]

class FederalRegisterClient:
    def __init__(self, debug=False):
        """Initialize the Federal Register client with optional debug mode"""
        self.base_url = "https://www.federalregister.gov/api/v1"
        self.headers = {
            "User-Agent": "Federal Register Data Collection Script for Academic Research",
            "Accept": "application/json"
        }
        self.results = []
        self.debug = debug
        
        # Domain classification based on agency
        self.agency_domain_mapping = {
            # Housing
            "Department of Housing and Urban Development": "Housing",
            "Federal Housing Finance Agency": "Housing",
            "Housing": "Housing",
            
            # Education
            "Department of Education": "Education",
            "Office of Postsecondary Education": "Education",
            "Education": "Education",
            
            # Criminal Justice
            "Department of Justice": "Criminal Justice",
            "Bureau of Prisons": "Criminal Justice",
            "Federal Bureau of Investigation": "Criminal Justice",
            "Justice": "Criminal Justice",
            
            # Employment
            "Department of Labor": "Employment",
            "Equal Employment Opportunity Commission": "Employment",
            "Office of Personnel Management": "Employment",
            "Labor": "Employment",
            
            # Healthcare
            "Department of Health and Human Services": "Healthcare",
            "Centers for Medicare & Medicaid Services": "Healthcare",
            "Food and Drug Administration": "Healthcare",
            "Health": "Healthcare",
            "Medicare": "Healthcare",
            "Medicaid": "Healthcare",
            
            # Public Safety
            "Department of Homeland Security": "Public Safety",
            "Public Safety": "Public Safety",
            "Homeland Security": "Public Safety",
            
            # Voting Rights
            "Election Assistance Commission": "Voting Rights",
            "Federal Election Commission": "Voting Rights",
            "Election": "Voting Rights",
            
            # Immigration
            "U.S. Citizenship and Immigration Services": "Immigration",
            "Immigration and Customs Enforcement": "Immigration",
            "Immigration": "Immigration",
            
            # Environment
            "Environmental Protection Agency": "Environment",
            "Environment": "Environment",
            
            # Finance
            "Department of the Treasury": "Finance",
            "Securities and Exchange Commission": "Finance",
            "Federal Reserve System": "Finance",
            "Treasury": "Finance",
            "Financial": "Finance",
            
            # Default
            "default": "Other"
        }
        
        # Policy statement patterns for identifying potential policy statements
        self.policy_patterns = [
            # Mandatory language
            r"must \w+", r"shall \w+", r"required to \w+", r"will be \w+",
            r"is mandatory", r"is required", r"are required", r"is necessary",
            r"may not \w+", r"cannot \w+", r"are not permitted to \w+",
            r"is prohibited", r"are prohibited", r"will not be \w+",
            r"only \w+ will be", r"only \w+ may", r"may only \w+", 
            
            # Requirements language
            r"minimum requirement", r"eligibility requirement", r"qualification",
            r"applicants must", r"compliance is", r"eligible for", r"qualifies for",
            r"in order to \w+", r"in accordance with", r"subject to",
            r"under the provisions", r"pursuant to", r"responsible for",
            r"will apply", r"would apply", r"applies to", r"will ensure",
            
            # Threshold language
            r"threshold", r"criteria", r"standard", r"guideline",
            r"acceptable", r"unacceptable", r"permitted", r"authorized",
            r"minimum of", r"maximum of", r"at least", r"no more than",
            r"not less than", r"not to exceed", r"within \d+",
            
            # Process language
            r"must submit", r"shall submit", r"required to submit",
            r"will provide", r"must provide", r"shall provide",
            r"must demonstrate", r"shall demonstrate", r"required to demonstrate",
            r"will determine", r"shall determine", r"must determine",
            r"will establish", r"shall establish", r"must establish",
            r"will implement", r"shall implement", r"must implement",
            r"will comply", r"shall comply", r"must comply",
            
            # Timeframe language
            r"by \w+ date", r"within \d+ days", r"no later than",
            r"deadline", r"timeframe", r"time period", r"due date",
            
            # Section markers that often introduce policies
            r"(^|\n)§ \d+\.\d+", r"(^|\n)Section \d+\.\d+",
            r"requirement[s]? for", r"standards for", r"guidelines for",
            r"criteria for", r"procedures for", r"rules for"
        ]
        
        # Special section headers that often contain policy statements
        self.policy_section_headers = [
            r"requirements", r"eligibility", r"qualifications", r"standards",
            r"criteria", r"procedures", r"guidelines", r"rules", r"regulations",
            r"responsibilities", r"prohibitions", r"restrictions", r"limitations",
            r"compliance", r"enforcement", r"penalties", r"sanctions",
            r"mandatory", r"required", r"prohibited", r"implementation"
        ]
        
        # Main high-level bias categories
        self.high_level_bias_categories = [
            "disability", 
            "religion", 
            "racial_cultural", 
            "gender", 
            "economic", 
            "criminal_justice", 
            "citizenship", 
            "education_language",
            "age",  # Added age as requested
            "no_bias"
        ]
        
        # Enhanced bias type detection with subcategories
        self.bias_type_keywords = {
            "disability": {
                "general": ["disability", "disabilities", "disabled", "handicap", "impairment", 
                           "accessible", "accessibility", "accommodation", "reasonable accommodation",
                           "ada", "americans with disabilities", "barrier free"],
                "physical_health": ["physical disability", "mobility", "wheelchair", "cane", "walker",
                                   "prosthetic", "blind", "deaf", "hearing impaired", "vision impaired",
                                   "vision loss", "paralysis", "paraplegia", "quadriplegia", "amputation",
                                   "chronic illness", "chronic condition", "medical condition", "medical necessity",
                                   "functional limitation", "physiological", "physical limitation", "physically disabled"],
                "mental_health": ["mental disability", "mental health", "psychiatric", "psychological",
                                 "cognitive", "intellectual disability", "developmental disability",
                                 "autism", "adhd", "anxiety", "depression", "bipolar", "schizophrenia",
                                 "mental illness", "behavioral health", "emotional disturbance",
                                 "neurodivergent", "learning disability", "psychosocial", "therapy", "counseling"]
            },
            "religion": {
                "general": ["religion", "religious", "faith", "spiritual", "worship", "belief", "creed",
                           "sect", "denomination", "prayer", "devotion", "religious practice",
                           "religious observance", "religious accommodation", "church", "mosque", "temple",
                           "synagogue", "christianity", "islam", "judaism", "buddhism", "hinduism", "atheist",
                           "holy day", "sabbath", "religious holiday", "religious garb", "religious attire"]
            },
            "racial_cultural": {
                "general": ["race", "racial", "ethnicity", "ethnic", "minority", "color", "interracial",
                           "discrimination", "segregation", "diversity", "multicultural", "diverse",
                           "cultural background", "cultural identity", "heritage", "ancestry", "origin"],
                "specific_groups": ["black", "white", "asian", "hispanic", "latino", "latina", "latinx",
                                  "native american", "indigenous", "pacific islander", "african american",
                                  "caucasian", "biracial", "multiracial", "people of color", "bipoc",
                                  "marginalized", "underrepresented", "minority group", "tribal"]
            },
            "gender": {
                "general": ["gender", "sex", "gender identity", "transgender", "cisgender", "non-binary",
                           "gender expression", "gender nonconforming", "gender neutral", "gendered",
                           "sexism", "gender bias", "gender discrimination", "gender-based", "gender role"],
                "specific_terms": ["woman", "women", "female", "male", "man", "men", "maternity", "paternity",
                                 "pregnancy", "pregnant", "breastfeeding", "lactation", "childbirth",
                                 "motherhood", "fatherhood", "parental", "maternal", "paternal",
                                 "femininity", "masculinity", "reproductive"]
            },
            "economic": {
                "general": ["income", "wealth", "poverty", "poor", "financial", "economic", "socioeconomic",
                           "class", "socioeconomic status", "financially", "economically", "monetary",
                           "fiscal", "financial resources", "financial status", "economic opportunity"],
                "housing": ["housing", "rent", "rental", "mortgage", "homeowner", "homelessness", "homeless",
                           "eviction", "tenant", "landlord", "lease", "public housing", "subsidized housing",
                           "affordable housing", "housing assistance", "section 8", "residence", "domicile",
                           "dwelling", "shelter", "residential", "housing voucher", "fair housing", "housing discrimination"],
                "employment": ["employment", "job", "work", "occupation", "career", "profession", "hire",
                              "hiring", "terminate", "termination", "layoff", "unemployment", "employee",
                              "employer", "workplace", "minimum wage", "salary", "earnings", "compensation",
                              "benefits", "labor", "working condition", "paid leave", "unpaid leave", "contract",
                              "part-time", "full-time", "seasonal", "temporary", "position", "promotion",
                              "demotion", "qualification", "workforce", "personnel", "staff"]
            },
            "criminal_justice": {
                "general": ["criminal", "crime", "offense", "convicted", "conviction", "arrest", "sentence",
                           "incarceration", "imprisonment", "jail", "prison", "detention", "detainee",
                           "inmate", "offender", "felon", "felony", "misdemeanor", "parole", "probation",
                           "recidivism", "rehabilitation", "correctional", "corrections", "criminal record",
                           "criminal history", "background check", "law enforcement", "police", "court",
                           "juvenile", "justice system", "adjudication", "legal system", "prosecution"]
            },
            "citizenship": {
                "general": ["citizen", "citizenship", "immigrant", "immigration", "documented", "undocumented",
                           "alien", "legal status", "immigration status", "naturalized", "naturalization",
                           "green card", "permanent resident", "temporary protected status", "asylum", "refugee",
                           "visa", "passport", "national", "nationality", "foreign national", "foreign-born",
                           "migrant", "migration", "deportation", "removal", "border", "entry", "authorization"],
                "voting": ["voting", "vote", "voter", "ballot", "election", "polling", "register to vote",
                          "voter registration", "voter id", "suffrage", "franchise", "disenfranchisement",
                          "electoral", "electorate", "eligible voter", "voting rights", "political participation",
                          "political representation", "candidate", "campaign", "democracy", "democratic process"]
            },
            "education_language": {
                "education": ["education", "school", "academic", "student", "teacher", "faculty", "classroom",
                             "curriculum", "grade", "graduation", "degree", "diploma", "certificate", "educational",
                             "learning", "instruction", "educational opportunity", "educational attainment",
                             "educational access", "school district", "admission", "matriculation", "enrollment",
                             "college", "university", "institution", "elementary", "secondary", "postsecondary",
                             "vocational", "technical", "training"],
                "language_proficiency": ["language", "english", "bilingual", "multilingual", "english proficiency",
                                        "limited english", "english language learner", "esl", "esol", "fluent",
                                        "proficiency", "speak", "understand", "comprehend", "interpret", "translator",
                                        "interpretation", "native language", "primary language", "mother tongue",
                                        "non-english", "language barrier", "communication barrier", "literacy"]
            },
            # Added age bias category as requested
            "age": {
                "general": ["age", "aging", "elderly", "senior", "retirement", "retiree", "mature", 
                           "old", "older", "young", "younger", "youth", "teenager", "adolescent", 
                           "minor", "adult", "juvenile", "child", "children", "age group", "generation",
                           "age requirement", "age limit", "age restriction", "age discrimination", 
                           "ageism", "age bias", "age-based", "age bracket", "age range", "medicare",
                           "social security", "pension", "geriatric", "pediatric", "over 65", "under 18",
                           "underage", "minor", "over the age of", "under the age of", "age of majority"]
            }
        }
        
        # Bias indicators for common potentially biased policy language
        self.bias_indicators = [
            # Restrictive language
            "minimum", "requirement", "only", "must", "shall", "mandatory",
            "required", "eligible", "qualifies", "qualified", "standard",
            
            # Exclusionary language
            "prohibited", "exclude", "excluding", "ineligible", "disqualified",
            "denied", "restricted", "limited", "limitation", 
            
            # Preferential language
            "preference", "preferred", "priority", "prioritize", "favor", 
            
            # Threshold language
            "threshold", "criteria", "qualify", "qualification", "minimum",
            "maximum", "at least", "no more than", "no less than",
            
            # Identity and demographic terms
            "citizen", "residency", "resident", "legal", "documentation",
            "background", "history", "record", "status"
        ]
        
        # Normative framing identification
        self.explicit_normative_indicators = [
            "prohibited", "banned", "not allowed", "restricted", "excluded",
            "required", "must", "shall", "mandatory", "only", "exclusively",
            "will not", "cannot", "may not", "exclude", "deny", "ineligible",
            "disqualified", "forbidden", "illegal", "unlawful", "impermissible",
            "obligated", "no person shall", "no person may", "shall not",
            "is not permitted", "are not permitted", "is not authorized",
            "are not authorized", "is forbidden", "are forbidden"
        ]
        
        self.implicit_normative_indicators = [
            "should", "encouraged", "recommended", "expected", "appropriate",
            "advisable", "preferred", "typically", "generally", "normally",
            "ordinarily", "commonly", "usual", "customary", "standard practice",
            "best practice", "preference", "priority", "consideration",
            "qualified", "eligible", "suitable", "ideal", "preferable"
        ]
    
    def search_documents(self, doc_types=None, start_date=None, end_date=None, per_page=1000, max_pages=None):
        """
        Search for documents using the Federal Register API
        
        Args:
            doc_types: List of document types (RULE, PRORULE, NOTICE)
            start_date: Start date in YYYY-MM-DD format
            end_date: End date in YYYY-MM-DD format
            per_page: Number of results per page (max 1000)
            max_pages: Maximum number of pages to fetch
        """
        print(f"Searching for documents from {start_date} to {end_date}")
        
        # Initialize variables for pagination
        page = 1
        total_results = 0
        more_pages = True
        
        while more_pages:
            # Build URL with query parameters
            url = f"{self.base_url}/documents.json"
            params = {
                "per_page": per_page,
                "page": page,
                "order": "newest",
                "fields[]": [
                    "title", 
                    "type", 
                    "document_number", 
                    "publication_date", 
                    "agencies", 
                    "abstract", 
                    "html_url", 
                    "pdf_url", 
                    "effective_on", 
                    "comments_close_on",
                    "regulation_id_number_info",
                    "docket_ids",
                    "significant"
                ]
            }
            
            # Add document types filter
            if doc_types:
                for doc_type in doc_types:
                    params.setdefault("conditions[type][]", []).append(doc_type)
            
            # Add date range filters
            if start_date:
                params["conditions[publication_date][gte]"] = start_date
            if end_date:
                params["conditions[publication_date][lte]"] = end_date
            
            # Make the API request
            try:
                print(f"Fetching page {page}...")
                response = requests.get(url, params=params, headers=self.headers)
                response.raise_for_status()
                
                data = response.json()
                
                # Debug information to understand the structure
                if page == 1:
                    print(f"Response contains keys: {list(data.keys())}")
                    if "count" in data:
                        print(f"Total count: {data['count']}")
                    if "results" in data:
                        print(f"Results count on this page: {len(data['results'])}")
                
                # Process results
                if "results" in data:
                    count = len(data["results"])
                    total_results += count
                    
                    for doc in data["results"]:
                        # Extract agency names (safely)
                        agency_names = []
                        if "agencies" in doc and doc["agencies"]:
                            # Handle different potential structures
                            if isinstance(doc["agencies"], list):
                                for agency in doc["agencies"]:
                                    if isinstance(agency, dict) and "name" in agency:
                                        agency_names.append(agency["name"])
                                    elif isinstance(agency, str):
                                        agency_names.append(agency)
                            elif isinstance(doc["agencies"], dict) and "name" in doc["agencies"]:
                                agency_names.append(doc["agencies"]["name"])
                        
                        # Determine domain based on agency
                        domain = "Other"
                        for agency in agency_names:
                            for known_agency, known_domain in self.agency_domain_mapping.items():
                                if known_agency.lower() in agency.lower():
                                    domain = known_domain
                                    break
                        
                        # Extract regulation ID numbers (safely)
                        rins = []
                        if "regulation_id_number_info" in doc and doc["regulation_id_number_info"]:
                            if isinstance(doc["regulation_id_number_info"], dict):
                                rins = list(doc["regulation_id_number_info"].keys())
                        
                        # Create a clean result with safe gets
                        result = {
                            "title": doc.get("title", ""),
                            "document_type": doc.get("type", ""),
                            "document_number": doc.get("document_number", ""),
                            "publication_date": doc.get("publication_date", ""),
                            "agencies": ", ".join(agency_names),
                            "domain": domain,
                            "abstract": doc.get("abstract", ""),
                            "html_url": doc.get("html_url", ""),
                            "pdf_url": doc.get("pdf_url", ""),
                            "effective_date": doc.get("effective_on", ""),
                            "comments_close_date": doc.get("comments_close_on", ""),
                            "regulation_id_numbers": ", ".join(rins),
                            "docket_ids": ", ".join(doc.get("docket_ids", [])) if isinstance(doc.get("docket_ids", []), list) else "",
                            "significant": "Yes" if doc.get("significant", False) else "No",
                            "policy_statements": [],  # Will be populated later if full text is downloaded
                        }
                        
                        # Extract policy statements from abstract if available
                        if result["abstract"]:
                            self.extract_policy_statements(result, "abstract")
                            
                        self.results.append(result)
                    
                    print(f"Retrieved {count} documents from page {page}")
                    
                    # Check if we need to get more pages
                    if "next_page_url" in data and data["next_page_url"] and count > 0:
                        page += 1
                        if max_pages and page > max_pages:
                            print(f"Reached maximum page limit of {max_pages}")
                            more_pages = False
                        # Add a small delay to be respectful of the API
                        time.sleep(0.5)
                    else:
                        more_pages = False
                else:
                    print("No results field found in the API response")
                    more_pages = False
            
            except Exception as e:
                print(f"Error fetching page {page}: {e}")
                if self.debug:
                    traceback.print_exc()
                more_pages = False
        
        print(f"Retrieved a total of {len(self.results)} documents")
    
    def extract_policy_statements(self, document, source_type="abstract"):
        """Extract policy statements from document text"""
        if source_type == "abstract":
            text = document.get("abstract", "")
        else:
            text = document.get("full_text", "")
            
        if not text:
            return
        
        # Using the simpler tokenizer
        sentences = simple_sentence_tokenize(text)
        
        # First, identify policy sections by headings
        policy_sections = []
        
        # Look for section headers that likely contain policies
        for header_pattern in self.policy_section_headers:
            pattern = re.compile(f"({header_pattern}[:\\.].*?)(?=\\n\\n|\\Z)", re.IGNORECASE | re.DOTALL)
            matches = pattern.finditer(text)
            for match in matches:
                policy_sections.append(match.group(1))
        
        # Process sentences in policy sections first (if found)
        for section in policy_sections:
            section_sentences = simple_sentence_tokenize(section)
            for sentence in section_sentences:
                self._process_policy_sentence(sentence, document, source_type, True)  # From policy sections
        
        # Process all sentences in the text with policy patterns
        for sentence in sentences:
            self._process_policy_sentence(sentence, document, source_type)
    
    def _process_policy_sentence(self, sentence, document, source_type, is_policy_section=False):
        """Process a sentence to check if it's a policy statement"""
        sentence = sentence.strip()
        
        # Skip very short sentences
        if len(sentence) < 10:
            return
            
        # Check if the sentence should be considered a policy statement
        is_policy = False
        
        # If from an identified policy section, consider it a policy statement
        if is_policy_section:
            is_policy = True
        else:
            # Check for policy patterns
            for pattern in self.policy_patterns:
                if re.search(pattern, sentence.lower()):
                    is_policy = True
                    break
        
        if is_policy:
            # Check if this exact sentence is already in the policy statements
            for existing in document["policy_statements"]:
                if existing["statement"] == sentence:
                    return
            
            # Detect bias in the policy statement
            bias_info = self._detect_bias(sentence)
            
            # Add the policy statement with bias classification
            document["policy_statements"].append({
                "statement": sentence,
                "source_section": source_type,
                "has_bias": bias_info["has_bias"],
                "bias_categories": bias_info.get("bias_categories", []),
                "primary_bias_category": bias_info.get("primary_bias_category", "no_bias"),
                "normative_framing": bias_info.get("normative_framing", "None"),
                "bias_indicators": bias_info.get("bias_indicators", [])
            })
    
    def _detect_bias(self, text):
        """
        Detect potential bias in a policy statement and categorize it
        
        Returns:
            Dictionary with bias detection information
        """
        text_lower = text.lower()
        result = {
            "has_bias": False,
            "bias_categories": [],
            "primary_bias_category": "no_bias",
            "bias_indicators": [],
            "normative_framing": "None"
        }
        
        # Store all matches to determine primary bias category
        all_matches = {}
        
        # Check each bias type category
        for category, subcategories in self.bias_type_keywords.items():
            category_matches = 0
            
            # Check general keywords first
            if "general" in subcategories:
                for keyword in subcategories["general"]:
                    if re.search(r'\b' + re.escape(keyword) + r'\b', text_lower):
                        category_matches += 1
                        result["bias_indicators"].append(keyword)
            
            # Check subcategory keywords
            for subcategory, keywords in subcategories.items():
                if subcategory == "general":
                    continue  # Already checked general keywords
                
                for keyword in keywords:
                    if re.search(r'\b' + re.escape(keyword) + r'\b', text_lower):
                        category_matches += 1
                        result["bias_indicators"].append(keyword)
            
            # If any matches found for this category, add it to results
            if category_matches > 0:
                result["has_bias"] = True
                # Just use the high-level category (no subcategories in output)
                result["bias_categories"].append(category)
                all_matches[category] = category_matches
        
        # Determine primary bias category (the one with most keyword matches)
        if all_matches:
            max_category = None
            max_count = 0
            for category, count in all_matches.items():
                if count > max_count:
                    max_count = count
                    max_category = category
            
            if max_category:
                result["primary_bias_category"] = max_category
        
        # Check for policy-level bias indicators even if no specific bias category was found
        if not result["has_bias"]:
            policy_bias_indicators = []
            for indicator in self.bias_indicators:
                if re.search(r'\b' + re.escape(indicator) + r'\b', text_lower):
                    policy_bias_indicators.append(indicator)
            
            if len(policy_bias_indicators) >= 2:  # Require at least 2 indicators for a general bias flag
                result["has_bias"] = True
                result["bias_categories"].append("general_policy_bias")
                result["primary_bias_category"] = "general_policy_bias"
                result["bias_indicators"].extend(policy_bias_indicators)
        
        # Detect normative framing (explicit vs implicit)
        result["normative_framing"] = self._identify_normative_framing(text_lower)
        
        return result
    
    def _identify_normative_framing(self, text_lower):
        """Identify the normative framing of a policy statement"""
        
        # Check for explicit normative indicators
        for indicator in self.explicit_normative_indicators:
            if re.search(r'\b' + re.escape(indicator) + r'\b', text_lower):
                return "Explicit"
        
        # Check for implicit normative indicators
        for indicator in self.implicit_normative_indicators:
            if re.search(r'\b' + re.escape(indicator) + r'\b', text_lower):
                return "Implicit"
        
        # If no clear indicators but bias was detected, default to implicit
        return "Implicit"
    
    def download_document_text(self, max_documents=100):
        """
        Download full text of documents using a more reliable method
        
        Args:
            max_documents: Maximum number of documents to download (to avoid overloading)
        """
        print(f"Downloading full text for up to {max_documents} documents...")
        
        count = 0
        download_failures = 0
        
        for doc in self.results:
            if count >= max_documents:
                break
                
            try:
                print(f"Downloading document {doc['document_number']}...")
                
                # Use the HTML URL directly, which is more reliable
                if doc.get("html_url"):
                    response = requests.get(doc["html_url"], headers=self.headers, timeout=30)
                    response.raise_for_status()
                    
                    # Simple clean-up of HTML to get text
                    full_text = self._clean_html(response.text)
                    doc["full_text"] = full_text
                    
                    # Extract policy statements from full text
                    self.extract_policy_statements(doc, "full_text")
                    
                    print(f"Extracted {len(doc['policy_statements'])} policy statements with bias detection")
                    count += 1
                else:
                    print(f"No HTML URL available for document {doc['document_number']}")
                    download_failures += 1
            except Exception as e:
                print(f"Error downloading document: {e}")
                if self.debug:
                    traceback.print_exc()
                download_failures += 1
            
            # Be respectful of server resources
            time.sleep(1)
        
        print(f"Downloaded full text for {count} documents")
        print(f"Failed to download {download_failures} documents")
    
    def _clean_html(self, html_content):
        """Simple method to clean HTML and extract text"""
        # Remove all HTML tags
        text = re.sub(r'<[^>]+>', ' ', html_content)
        
        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text)
        
        # Return cleaned text
        return text.strip()
    
    def generate_bias_benchmark_dataset(self, filename):
        """Generate a dataset for bias benchmark testing with modified columns format"""
        bias_data = []
        
        id_counter = 0  # Start from 0 as requested
        for doc in self.results:
            for policy in doc["policy_statements"]:
                if policy.get("has_bias", False):
                    bias_data.append({
                        "Index": id_counter,  # Changed from ID to Index
                        "Excerpt": policy["statement"],  # Changed from Policy Excerpt to Excerpt
                        "Date": doc["publication_date"],  # Date column as requested
                        "Bias Type": policy.get("primary_bias_category", "no_bias"),  # Changed to just use high-level category
                        "Normative Framing": policy.get("normative_framing", "Unknown"),  # Kept as is
                        "Source_HTML": doc.get("html_url", ""),  # HTML URL as requested
                        "Source_PDF": doc.get("pdf_url", "")  # PDF URL as requested
                    })
                    id_counter += 1
        
        # Save to CSV
        if bias_data:
            df = pd.DataFrame(bias_data)
            df.to_csv(filename, index=False, quoting=csv.QUOTE_NONNUMERIC)
            print(f"Bias benchmark dataset saved to {filename} with {len(bias_data)} entries")
            return df
        else:
            print("No biased policy statements found")
            return None
    
    def save_to_csv(self, filename):
        """Save results to CSV file"""
        if not self.results:
            print("No results to save.")
            return
            
        df = pd.DataFrame(self.results)
        # Drop full_text column if it exists (to keep CSV manageable)
        if "full_text" in df.columns:
            df = df.drop(columns=["full_text"])
            
        # Convert policy_statements to a string representation
        df["policy_statements"] = df["policy_statements"].apply(lambda x: json.dumps(x))
        
        df.to_csv(filename, index=False, quoting=csv.QUOTE_NONNUMERIC)
        print(f"Results saved to {filename}")
    
    def save_to_json(self, filename):
        """Save results to JSON file"""
        if not self.results:
            print("No results to save.")
            return
            
        # Create a simplified copy for JSON serialization
        results_copy = []
        for doc in self.results:
            doc_copy = doc.copy()
            # Remove full_text to keep JSON file manageable
            if "full_text" in doc_copy:
                del doc_copy["full_text"]
            results_copy.append(doc_copy)
            
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(results_copy, f, ensure_ascii=False, indent=4)
        print(f"Results saved to {filename}")
        
    def save_policy_statements_csv(self, filename):
        """Save just the policy statements to a CSV file for easier review"""
        if not self.results:
            print("No results to save.")
            return
            
        policy_data = []
        
        for doc in self.results:
            doc_number = doc["document_number"]
            doc_title = doc["title"]
            doc_type = doc["document_type"]
            domain = doc["domain"]
            agency = doc["agencies"]
            
            for idx, policy in enumerate(doc["policy_statements"]):
                # Map the primary bias category to just the high-level category
                primary_bias = policy.get("primary_bias_category", "no_bias")
                # Extract just the high-level category if it's a subcategory
                if "_" in primary_bias:
                    primary_bias = primary_bias.split("_")[0]
                
                # Only include high-level categories in the output
                bias_categories = []
                for category in policy.get("bias_categories", []):
                    if "_" in category:
                        high_level = category.split("_")[0]
                        if high_level not in bias_categories:
                            bias_categories.append(high_level)
                    else:
                        if category not in bias_categories:
                            bias_categories.append(category)
                
                # Check if the high-level category is in our allowable categories
                if primary_bias not in self.high_level_bias_categories:
                    primary_bias = "no_bias"
                
                policy_data.append({
                    "document_number": doc_number,
                    "title": doc_title,
                    "document_type": doc_type,
                    "domain": domain,
                    "agency": agency,
                    "policy_id": f"{doc_number}-{idx+1}",
                    "statement": policy["statement"],
                    "source_section": policy["source_section"],
                    "has_bias": policy.get("has_bias", False),
                    "primary_bias_category": primary_bias,  # Using high-level category only
                    "all_bias_categories": ", ".join(bias_categories),  # Only high-level categories
                    "normative_framing": policy.get("normative_framing", "None"),
                    "bias_keywords": ", ".join(policy.get("bias_indicators", []))
                })
        
        if policy_data:
            df = pd.DataFrame(policy_data)
            df.to_csv(filename, index=False, quoting=csv.QUOTE_NONNUMERIC)
            print(f"Policy statements saved to {filename}")
            return df
        else:
            print("No policy statements found")
            return None
    
    def generate_bias_statistics(self):
        """Generate statistics about bias detection in the dataset"""
        if not self.results:
            print("No results to analyze.")
            return {}
        
        total_policy_statements = 0
        biased_statements = 0
        bias_categories_count = {}
        normative_framing_count = {"Explicit": 0, "Implicit": 0, "None": 0}
        domain_bias_count = {}
        
        for doc in self.results:
            domain = doc["domain"]
            if domain not in domain_bias_count:
                domain_bias_count[domain] = {"total": 0, "biased": 0}
            
            for policy in doc["policy_statements"]:
                total_policy_statements += 1
                domain_bias_count[domain]["total"] += 1
                
                if policy.get("has_bias", False):
                    biased_statements += 1
                    domain_bias_count[domain]["biased"] += 1
                    
                    # Count primary bias categories (high-level only)
                    primary_category = policy.get("primary_bias_category", "no_bias")
                    # Extract just the high-level category if it's a subcategory
                    if "_" in primary_category:
                        primary_category = primary_category.split("_")[0]
                    
                    bias_categories_count[primary_category] = bias_categories_count.get(primary_category, 0) + 1
                    
                    # Count normative framing
                    framing = policy.get("normative_framing", "None")
                    normative_framing_count[framing] = normative_framing_count.get(framing, 0) + 1
        
        # Calculate percentages for domain bias
        for domain in domain_bias_count:
            total = domain_bias_count[domain]["total"]
            biased = domain_bias_count[domain]["biased"]
            domain_bias_count[domain]["percentage"] = (biased / total * 100) if total > 0 else 0
        
        # Prepare statistics
        stats = {
            "total_policy_statements": total_policy_statements,
            "biased_statements": biased_statements,
            "bias_percentage": (biased_statements / total_policy_statements * 100) if total_policy_statements > 0 else 0,
            "bias_categories_count": bias_categories_count,
            "normative_framing_count": normative_framing_count,
            "domain_bias_count": domain_bias_count
        }
        
        return stats


# This function can be called directly from Jupyter or as a script
def fetch_federal_register_documents(doc_types=["RULE", "PRORULE", "NOTICE"], 
                                     start_date="2024-01-01", 
                                     end_date=None,
                                     per_page=1000, 
                                     max_pages=2,
                                     download_full_text=True,
                                     max_downloads=25,
                                     output="federal_register_data",
                                     output_format="both",
                                     generate_bias_dataset=True,
                                     print_bias_statistics=True,
                                     debug=False):
    """
    Fetch documents from the Federal Register API and process for bias identification
    
    Args:
        doc_types: List of document types (RULE, PRORULE, NOTICE)
        start_date: Start date in YYYY-MM-DD format
        end_date: End date in YYYY-MM-DD format (defaults to today)
        per_page: Number of results per page (max 1000)
        max_pages: Maximum number of pages to fetch
        download_full_text: Whether to download and parse full document text
        max_downloads: Maximum number of full texts to download
        output: Output filename prefix (without extension)
        output_format: Output format (csv, json, or both)
        generate_bias_dataset: Whether to generate a bias benchmark dataset
        print_bias_statistics: Whether to print statistics about bias detection
        debug: Enable debug mode for more detailed error messages
    
    Returns:
        DataFrame containing the results
    """
    # Set default end date to today if not provided
    if end_date is None:
        end_date = datetime.now().strftime("%Y-%m-%d")
    
    # Create output directory if it doesn't exist
    output_dir = "output"
    os.makedirs(output_dir, exist_ok=True)
    
    client = FederalRegisterClient(debug=debug)
    
    try:
        client.search_documents(
            doc_types=doc_types,
            start_date=start_date,
            end_date=end_date,
            per_page=per_page,
            max_pages=max_pages
        )
        
        # Download full text for a subset of documents
        if download_full_text and client.results:
            client.download_document_text(max_documents=max_downloads)
        
        # Generate bias benchmark dataset
        if generate_bias_dataset:
            bias_dataset_filename = os.path.join(output_dir, f"{output}_bias_benchmark.csv")
            bias_df = client.generate_bias_benchmark_dataset(bias_dataset_filename)
        
        # Save policy statements to a separate CSV file
        policy_statements_filename = os.path.join(output_dir, f"{output}_policy_statements.csv")
        policy_df = client.save_policy_statements_csv(policy_statements_filename)
        
        # Print bias statistics if requested
        if print_bias_statistics and client.results:
            stats = client.generate_bias_statistics()
            print("\n===== BIAS DETECTION STATISTICS =====")
            print(f"Total policy statements: {stats['total_policy_statements']}")
            print(f"Biased statements: {stats['biased_statements']} ({stats['bias_percentage']:.2f}%)")
            
            print("\nBias Categories:")
            for category, count in sorted(stats['bias_categories_count'].items(), key=lambda x: x[1], reverse=True):
                print(f"  {category}: {count} statements")
            
            print("\nNormative Framing:")
            for framing, count in stats['normative_framing_count'].items():
                print(f"  {framing}: {count} statements")
            
            print("\nBias by Domain:")
            for domain, counts in sorted(stats['domain_bias_count'].items(), key=lambda x: x[1]['percentage'], reverse=True):
                print(f"  {domain}: {counts['biased']} of {counts['total']} statements ({counts['percentage']:.2f}% biased)")
            print("====================================\n")
        
        # Save results
        if output_format in ["csv", "both"]:
            csv_filename = os.path.join(output_dir, f"{output}.csv")
            client.save_to_csv(csv_filename)
        
        if output_format in ["json", "both"]:
            json_filename = os.path.join(output_dir, f"{output}.json")
            client.save_to_json(json_filename)
        
        # Return a DataFrame with the results
        return pd.DataFrame(client.results)
    
    except Exception as e:
        print(f"Error in main process: {e}")
        if debug:
            traceback.print_exc()
        return pd.DataFrame()  # Return empty DataFrame on error


# Example usage
if __name__ == "__main__":
    # Basic usage - gets all rules, proposed rules, and notices from 2024
    # results_df = fetch_federal_register_documents()

    # Custom date range with debug mode enabled
    # results_df = fetch_federal_register_documents(
    #     start_date="2024-01-01",
    #     end_date="2024-04-30",
    #     max_pages=3,
    #     max_downloads=50,
    #     output="federal_register_data",
    #     debug=True
    # )
    
    # Only download proposed rules with limited pages for testing
    # results_df = fetch_federal_register_documents(
    #     doc_types=["PRORULE"],
    #     max_pages=1,
    #     max_downloads=10,
    #     output="proposed_rules_test"
    # )
    
    print("Script loaded successfully. Ready to use fetch_federal_register_documents() function.")


# For use in Jupyter notebooks:
"""
# Run the scraper
results_df = fetch_federal_register_documents(
    doc_types=["RULE", "PRORULE", "NOTICE"],
    start_date="2024-01-01",
    end_date=None,  # Defaults to today
    max_pages=2,
    max_downloads=25,
    debug=True
)

# Load the bias benchmark dataset for analysis
import pandas as pd
bias_df = pd.read_csv("output/federal_register_data_bias_benchmark.csv")

# Preview the dataset
print("Benchmark Dataset Columns:", bias_df.columns.tolist())
print("\nSample rows:")
bias_df.head()

# Analyze bias categories
bias_counts = bias_df["Bias Type"].value_counts()
print("\nBias Type Distribution:")
print(bias_counts)

# Visualize results
import matplotlib.pyplot as plt
bias_counts.plot(kind="bar")
plt.title("Distribution of Bias Categories")
plt.xlabel("Bias Category")
plt.ylabel("Count")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Analyze normative framing 
framing_by_bias = pd.crosstab(bias_df["Bias Type"], bias_df["Normative Framing"])
print("\nNormative Framing by Bias Type:")
print(framing_by_bias)

# Visualize normative framing
framing_by_bias.plot(kind="bar", stacked=True)
plt.title("Normative Framing by Bias Type")
plt.xlabel("Bias Type")
plt.ylabel("Count")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
"""

Script loaded successfully. Ready to use fetch_federal_register_documents() function.


'\n# Run the scraper\nresults_df = fetch_federal_register_documents(\n    doc_types=["RULE", "PRORULE", "NOTICE"],\n    start_date="2024-01-01",\n    end_date=None,  # Defaults to today\n    max_pages=2,\n    max_downloads=25,\n    debug=True\n)\n\n# Load the bias benchmark dataset for analysis\nimport pandas as pd\nbias_df = pd.read_csv("output/federal_register_data_bias_benchmark.csv")\n\n# Preview the dataset\nprint("Benchmark Dataset Columns:", bias_df.columns.tolist())\nprint("\nSample rows:")\nbias_df.head()\n\n# Analyze bias categories\nbias_counts = bias_df["Bias Type"].value_counts()\nprint("\nBias Type Distribution:")\nprint(bias_counts)\n\n# Visualize results\nimport matplotlib.pyplot as plt\nbias_counts.plot(kind="bar")\nplt.title("Distribution of Bias Categories")\nplt.xlabel("Bias Category")\nplt.ylabel("Count")\nplt.xticks(rotation=45)\nplt.tight_layout()\nplt.show()\n\n# Analyze normative framing \nframing_by_bias = pd.crosstab(bias_df["Bias Type"], bias_df["Normative F

In [None]:
results_df = fetch_federal_register_documents(
    doc_types=["RULE", "PRORULE", "NOTICE"],  # Document types to retrieve
    start_date="1994-01-01",                  # Start date for search
    end_date=None,                            # End date (defaults to today)
    per_page=1000,                            # Results per page
    max_pages=100,                              # Max pages to fetch
    download_full_text=True,                  # Whether to download full text
    max_downloads=1000000,                         # Max documents to download
    output="federal_register_data",           # Output filename prefix
    output_format="both",                     # Output format (csv, json, both)
    generate_bias_dataset=True,               # Generate bias benchmark dataset
    print_bias_statistics=True                # Print statistics about bias
)

In [2]:
# Strategy to download all available Federal Register documents
# We'll use a year-by-year approach to avoid overwhelming the API

import time
from datetime import datetime, timedelta

# Track overall statistics
all_results = []
total_documents = 0
years_processed = 0

# Set the starting year (Federal Register API has data from around 1994)
start_year =2000
end_year = 2002

for year in range(start_year, end_year + 1):
    print(f"\n{'='*50}")
    print(f"Processing documents from year {year}")
    print(f"{'='*50}")
    
    # Process each year separately
    try:
        year_results = fetch_federal_register_documents(
            doc_types=["RULE", "PRORULE", "NOTICE"],
            start_date=f"{year}-01-01",
            end_date=f"{year}-12-31",
            per_page=5000,                      # Maximum allowed by API
            max_pages=None,                     # No page limit - get all pages for the year
            download_full_text=True,
            max_downloads=1000,                  # Increase this if you want more full text per year
            output=f"federal_register_data_{year}",  # Separate files for each year
            output_format="both",
            generate_bias_dataset=True,
            print_bias_statistics=True,
            debug=True
        )
        
        # Update tracking stats
        if not year_results.empty:
            all_results.append(year_results)
            total_documents += len(year_results)
            years_processed += 1
            
        print(f"Completed year {year} - Retrieved {len(year_results)} documents")
        
        # Add a delay between years to be respectful of the API
        time.sleep(5)
        
    except Exception as e:
        print(f"Error processing year {year}: {e}")
        # Continue with next year even if one fails
        continue

# Print final statistics
print(f"\n{'='*50}")
print(f"FINAL STATISTICS")
print(f"{'='*50}")
print(f"Total years processed: {years_processed}")
print(f"Total documents retrieved: {total_documents}")
print(f"Output files are in the 'output' directory with year-specific names")


Processing documents from year 2000
Searching for documents from 2000-01-01 to 2000-12-31
Fetching page 1...
Response contains keys: ['count', 'description', 'total_pages', 'next_page_url', 'results']
Total count: 32679
Results count on this page: 20
Retrieved 20 documents from page 1
Fetching page 2...
Retrieved 20 documents from page 2
Fetching page 3...
Retrieved 20 documents from page 3
Fetching page 4...
Retrieved 20 documents from page 4
Fetching page 5...
Retrieved 20 documents from page 5
Fetching page 6...
Retrieved 20 documents from page 6
Fetching page 7...
Retrieved 20 documents from page 7
Fetching page 8...
Retrieved 20 documents from page 8
Fetching page 9...
Retrieved 20 documents from page 9
Fetching page 10...
Retrieved 20 documents from page 10
Fetching page 11...
Retrieved 20 documents from page 11
Fetching page 12...
Retrieved 20 documents from page 12
Fetching page 13...
Retrieved 20 documents from page 13
Fetching page 14...
Retrieved 20 documents from page 14
F

# After manual cleaning, Delete Duplicates

In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np

# Load the CSV file
file_path = 'Fed Reg - Bias Benchmark - Contatenated+Cleaned.csv'
df = pd.read_csv(file_path)

# Display basic information about the dataframe
print(f"Original DataFrame Shape: {df.shape}")
print(f"Column Names: {df.columns.tolist()}")

# Check for duplicate excerpts
duplicate_mask = df.duplicated(subset=['Excerpt'], keep='first')
duplicate_df = df[duplicate_mask]

# Count duplicates
num_duplicates = duplicate_df.shape[0]
print(f"Number of duplicate excerpts found: {num_duplicates}")

# Document all duplicate excerpts
if num_duplicates > 0:
    print("\nDuplicate Excerpts Documentation:")
    
    # Get all excerpts that have duplicates
    duplicated_excerpts = df[df.duplicated(subset=['Excerpt'], keep=False)]['Excerpt'].unique()
    
    for i, excerpt in enumerate(duplicated_excerpts, 1):
        # Find all rows with this excerpt
        matching_rows = df[df['Excerpt'] == excerpt]
        print(f"\nDuplicate Set #{i}:")
        print(f"Excerpt: {excerpt[:100]}..." if len(excerpt) > 100 else f"Excerpt: {excerpt}")
        print(f"Appears {matching_rows.shape[0]} times")
        print("Occurrences (Date, Bias Type):")
        for idx, row in matching_rows.iterrows():
            print(f"  - Index {idx}: Date={row['Date']}, Bias Type={row['Bias Type']}")
    
    # Remove duplicates
    df_cleaned = df.drop_duplicates(subset=['Excerpt'], keep='first')
    print(f"\nCleaned DataFrame Shape: {df_cleaned.shape}")
    
    # Save the cleaned dataframe to a new CSV file
    cleaned_file_path = 'Fed Reg - Bias Benchmark - Contatenated+Cleaned_cleaned.csv'
    df_cleaned.to_csv(cleaned_file_path, index=False)
    print(f"Cleaned data saved to '{cleaned_file_path}'")
else:
    print("No duplicates found. The dataset is already clean.")