In [None]:
import logging
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
import time
from typing import Dict, List, Optional
import re
from datetime import datetime
from pathlib import Path

In [None]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

In [None]:
DEFAULT_HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}

DEFAULT_TIMEOUT = 10
DEFAULT_DELAY = 2

In [None]:
def make_request(url: str, headers: Dict = None, timeout: int = DEFAULT_TIMEOUT) -> requests.Response:
    """Make HTTP request with error handling."""
    try:
        headers = headers or DEFAULT_HEADERS
        response = requests.get(url, headers=headers, timeout=timeout)
        response.raise_for_status()
        return response
    except requests.RequestException as e:
        logger.error(f"Error making request to {url}: {str(e)}")
        raise

def safe_soup_parse(html_content: str) -> BeautifulSoup:
    """Safely parse HTML content with BeautifulSoup."""
    try:
        return BeautifulSoup(html_content, 'html.parser')
    except Exception as e:
        logger.error(f"Error parsing HTML: {str(e)}")
        raise

In [None]:
def get_first_duckduckgo_result(
    query: str,
    num_results: int = 1,
    include_html: bool = True,
    region: str = 'wt-wt',
    safesearch: str = 'moderate',
    timelimit: str | None = None
) -> Dict:
    """
    Enhanced DuckDuckGo search function with error handling and logging.
    
    Args:
        query (str): Search query string
        num_results (int): Number of results to retrieve (default: 1)
        include_html (bool): Whether to include HTML content in result (default: True)
        region (str): Region code for search results (default: worldwide)
        safesearch (str): SafeSearch setting ('on', 'moderate', 'off')
        timelimit (str): Time limit for results ('d' for day, 'w' for week, 'm' for month, 'y' for year)
    
    Returns:
        Dict: Search result information including URL, description, and metadata
    """
    logger.info(f"Searching DuckDuckGo for: {query}")
    try:
        # Initialize DuckDuckGo search
        ddgs = DDGS()
        
        # Get search results
        search_results = list(ddgs.text(
            keywords=query,
            region=region,
            safesearch=safesearch,
            timelimit=timelimit,
            max_results=num_results
        ))
        
        if not search_results:
            logger.warning("No results found for query")
            return {"error": "No results found", "success": False}
            
        first_result = search_results[0]
        
        # Build result dictionary with available fields
        result = {
            "url": first_result.get('link') or first_result.get('href'),
            "title": first_result.get('title', 'No title found'),
            "description": first_result.get('body', ''),
            "success": True,
            "timestamp": datetime.now().isoformat()
        }
        
        if include_html:
            try:
                # Use the DDGS text function with html backend to get HTML content
                html_results = list(ddgs.text(
                    keywords=query,
                    region=region,
                    safesearch=safesearch,
                    timelimit=timelimit,
                    backend='html',
                    max_results=1
                ))
                if html_results:
                    result["html_content"] = html_results[0].get('html', '')
            except Exception as e:
                logger.warning(f"Could not retrieve HTML content: {str(e)}")
                result["html_content"] = None
        
        logger.info(f"Successfully retrieved result for: {result['url']}")
        return result
        
    except Exception as e:
        logger.error(f"Error in get_first_duckduckgo_result: {str(e)}")
        return {"error": str(e), "success": False}

In [None]:
def clean_html_content(url: str) -> Dict:
    """
    Enhanced HTML cleaning function with better text processing.
    """
    logger.info(f"Cleaning HTML content from: {url}")
    try:
        # Fetch content
        response = make_request(url)
        soup = safe_soup_parse(response.text)
        
        # Remove unwanted elements
        for element in soup.select('script, style, img, link, iframe, header, footer, nav, [class*="ads"], .advertisement'):
            element.decompose()
        
        # Process links
        for a_tag in soup.find_all('a'):
            a_tag.replace_with(a_tag.text)
        
        # Extract and clean text
        text = soup.get_text()
        
        # Enhanced text cleaning
        lines = []
        for line in text.split('\n'):
            line = line.strip()
            if line and len(line) > 1:  # Skip single-character lines
                lines.append(line)
        
        text = ' '.join(lines)
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'[^\w\s\-.,;:!?«»àâäéèêëîïôöùûüÿçÀÂÄÉÈÊËÎÏÔÖÙÛÜŸÇ]', '', text)
        
        result = {
            "url": url,
            "cleaned_content": text.strip(),
            "original_length": len(response.text),
            "cleaned_length": len(text.strip()),
            "success": True,
            "status_code": response.status_code,
            "content_type": response.headers.get('content-type', ''),
            "timestamp": datetime.now().isoformat()
        }
        
        logger.info(f"Successfully cleaned content from: {url}")
        return result
        
    except Exception as e:
        logger.error(f"Error in clean_html_content: {str(e)}")
        return {
            "url": url,
            "cleaned_content": "",
            "error": str(e),
            "success": False
        }


In [None]:
query = "communes prefecture \"Agadir Ida-Outanane\" wikipedia"
result = get_first_duckduckgo_result(query, include_html=False)
clean_result = clean_html_content(result['url'])
print(clean_result['cleaned_content'])

In [None]:
print(result['url'])

In [None]:

import json
# Load the region divisions data
with open('../data/region-divisions.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

# Create a list to store processed divisions
results = []

# Process each division
for division in data['region_divisions']:
    # Create query combining type and French name
    query = f"les communes de {division['type']} de \"{division['name']['fr']}\" wikipedia"
    
    # Get wiki content using your existing function
    raw_content = get_first_duckduckgo_result(query, include_html=False)
    # retry 8 times if failed
    retry = 8
    for i in range(retry):
        if raw_content['success']:
            break
        time.sleep(3)
        raw_content = get_first_duckduckgo_result(query, include_html=False)
    

    if raw_content['success']:
        clean_content = clean_html_content(raw_content["url"])
        content = clean_content["cleaned_content"]
    else:
        content = ""
    
    # Create result object
    result = {
        "id": division['id'],
        "name": division['name']['en'],
        "content": content
    }
    
    # Add to results list
    results.append(result)
    
    # Print progress (optional)
    print(f"Processed {division['id']}: {division['name']['en']}")

# Save results to JSON file
with open('result.json', 'w', encoding='utf-8') as f:
    json.dump({"divisions": results}, f, ensure_ascii=False, indent=2)

# Print completion message
print(f"\nProcessing complete! Processed {len(results)} divisions.")


In [None]:
# Import required libraries
import json
from openai import OpenAI
import time
from pydantic import BaseModel
from typing import List, Literal, Optional

# Initialize OpenAI client
client = OpenAI(api_key="")

class CommuneName(BaseModel):
    fr: str
    en: str
    ar: str
    es: str

class Commune(BaseModel):
    name: CommuneName
    type: str  # Can be "urbain", "rurale", or empty string
    region_division_id: str
    comments: List[str]

class ExtractedCommunes(BaseModel):
    communes: List[Commune]
    division_comments: List[str]

class ProcessingResult(BaseModel):
    communes: List[Commune]
    processing_comments: List[str]
    metadata: dict

def translate_name(name: str) -> CommuneName:
    """Translate commune name to multiple languages."""
    try:
        completion = client.beta.chat.completions.parse(
            model="gpt-4o-2024-08-06",
            messages=[{
                "role": "user",
                "content": f"Translate this Moroccan commune name: {name}"
            }],
            response_format=CommuneName
        )
        return completion.choices[0].message.parsed
    except Exception as e:
        print(f"Translation error for {name}: {str(e)}")
        return CommuneName(fr=name, en=name, ar=name, es=name)

def extract_communes_from_wiki(content: dict) -> ExtractedCommunes:
    """Extract communes from wiki content using GPT-4o."""
    try:
        # Safely access dictionary content with get() method and provide defaults
        title = content.get('name', 'Unknown Title')
        # Check if content is a dictionary and has 'full_content'
        full_content = content.get('content', '')

        division_id = content.get('id', 'Unknown ID')
        
        if not isinstance(full_content, str):
            raise ValueError(f"Invalid content format for division {division_id}. Expected string, got {type(full_content)}")

        # Print debug information
        print(f"Processing content for {division_id}:")
        print(f"Title: {title}")
        print(f"Content length: {len(str(full_content))} characters")

        completion = client.beta.chat.completions.parse(
            model="gpt-4o-2024-08-06",
            messages=[
                {
                    "role": "system",
                    "content": "Extract and classify Moroccan communes from the provided content."
                },
                {
                    "role": "user",
                    "content": f"""Analyze this content and extract communes:
                    Title: {title}
                    Content: {full_content}
                    
                    Rules:
                    - Classify as 'urbain' for cities/municipalities
                    - Classify as 'rurale' for villages/rural areas
                    - Leave type empty if unclear
                    - Add relevant comments for uncertainties"""
                }
            ],
            response_format=ExtractedCommunes
        )
        
        result = completion.choices[0].message
        
        if hasattr(result, 'refusal') and result.refusal:
            return ExtractedCommunes(
                communes=[],
                division_comments=[f"Content processing refused: {result.refusal}"]
            )
            
        # Process each commune to add translations
        processed_communes = []
        for commune in result.parsed.communes:
            translations = translate_name(commune.name.fr)
            
            processed_commune = Commune(
                name=translations,
                type=commune.type,
                region_division_id=division_id,
                comments=commune.comments
            )
            processed_communes.append(processed_commune)
            time.sleep(0.5)  # Rate limiting
            
        return ExtractedCommunes(
            communes=processed_communes,
            division_comments=result.parsed.division_comments
        )
        
    except Exception as e:
        error_msg = f"Error processing {division_id}: {str(e)}"
        print(error_msg)
        # Print more detailed debug information
        print(f"Content type: {type(content)}")
        print(f"Content keys: {content.keys() if isinstance(content, dict) else 'Not a dictionary'}")
        return ExtractedCommunes(
            communes=[],
            division_comments=[error_msg]
        )

def process_divisions(input_file: str, output_file: str) -> None:
    print("Loading division results...")
    try:
        with open(input_file, 'r', encoding='utf-8') as f:
            wiki_results = json.load(f)
    except Exception as e:
        print(f"Error loading input file: {str(e)}")
        return

    if not isinstance(wiki_results, dict) or 'divisions' not in wiki_results:
        print("Invalid input file format: missing 'divisions' key or not a dictionary")
        return

    all_communes = []
    all_comments = []
    total_divisions = len(wiki_results['divisions'])

    for idx, division in enumerate(wiki_results['divisions'], 1):
        print(f"\nProcessing division {idx}/{total_divisions}: {division.get('name', 'Unknown')}")
        
        try:
            if not isinstance(division, dict):
                raise ValueError(f"Division {idx} is not a dictionary")
                
            if 'content' not in division or 'id' not in division:
                raise ValueError(f"Division {idx} missing required fields")

            result = extract_communes_from_wiki(division)
            
            if result.division_comments:
                all_comments.extend([
                    f"Division {division['id']} ({division.get('name', 'Unknown')}): {comment}"
                    for comment in result.division_comments
                ])
            
            all_communes.extend(result.communes)
            print(f"Extracted {len(result.communes)} communes")
            time.sleep(1)  # Rate limiting between divisions
            
        except Exception as e:
            error_msg = f"Failed to process division {division.get('id', 'Unknown')}: {str(e)}"
            print(error_msg)
            all_comments.append(error_msg)
            continue

    # Create final result
    output = ProcessingResult(
        communes=all_communes,
        processing_comments=all_comments,
        metadata={
            "total_communes": len(all_communes),
            "total_divisions_processed": total_divisions,
            "timestamp": datetime.now().isoformat()
        }
    )

    # Save results
    print("\nSaving results...")
    try:
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(output.model_dump(), f, ensure_ascii=False, indent=2)
    except Exception as e:
        print(f"Error saving output file: {str(e)}")
        return

    print(f"\nExtraction complete!")
    print(f"Total communes extracted: {len(all_communes)}")
    print(f"Total comments/notes: {len(all_comments)}")

if __name__ == "__main__":
    try:
        process_divisions('result.json', 'communes.json')
    except Exception as e:
        print(f"Processing failed: {str(e)}")