In [1]:
import os
import sys
from pathlib import Path

# Aggiungi il path del progetto
project_root = Path.cwd()
src_path = project_root / 'src'
if str(src_path) not in sys.path:
    sys.path.append(str(src_path))

# Test importazione dipendenze
try:
    import asyncpg
    import aiosql
    import logfire
    from dotenv import load_dotenv
    print("✅ Tutte le dipendenze sono installate correttamente")
except ImportError as e:
    print(f"❌ Manca la dipendenza: {e.name}")
    print(f"Installa con: uv pip install {e.name}")

✅ Tutte le dipendenze sono installate correttamente


In [2]:
# Verifica struttura directory SQL
sql_dir = project_root / 'src' / 'crawler' / 'sql'
print(f"Directory SQL: {sql_dir}")
print(f"Esiste: {sql_dir.exists()}")

# Crea directory SQL se non esiste
sql_dir.mkdir(parents=True, exist_ok=True)
print("✅ Directory SQL creata/verificata")

Directory SQL: /home/sam/github/scrapy-playwright-scrapegraphai/src/crawler/sql
Esiste: True
✅ Directory SQL creata/verificata


In [3]:
# Test connessione database
load_dotenv()  # Carica variabili ambiente dal file .env

async def test_db_connection():
    conn_params = {
        'host': os.getenv('POSTGRES_HOST', 'localhost'),
        'port': int(os.getenv('POSTGRES_PORT', 5432)),
        'user': os.getenv('POSTGRES_USER', 'postgres'),
        'password': os.getenv('POSTGRES_PASSWORD', ''),
        'database': os.getenv('POSTGRES_DATABASE', 'postgres')
    }
    
    try:
        conn = await asyncpg.connect(**conn_params)
        version = await conn.fetchval('SELECT version()')
        print(f"✅ Connessione al database riuscita\nVersione PostgreSQL: {version}")
        await conn.close()
        return True
    except Exception as e:
        print(f"❌ Errore di connessione: {e}")
        return False

# Esegui test connessione
await test_db_connection()

✅ Connessione al database riuscita
Versione PostgreSQL: PostgreSQL 16.4 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 11.2.0, 64-bit


True

In [6]:
# Modifica della cella 4 con il nuovo schema
schema_sql = """
-- name: create_schema!
CREATE TABLE IF NOT EXISTS frontier_url (
    id BIGSERIAL PRIMARY KEY,
    url TEXT NOT NULL,
    category TEXT NOT NULL,
    type INTEGER NOT NULL CHECK (type >= 0 AND type <= 2),
    depth INTEGER NOT NULL DEFAULT 0,
    is_target BOOLEAN NOT NULL DEFAULT false,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    processed_at TIMESTAMP WITH TIME ZONE,
    failed_attempts INTEGER DEFAULT 0,
    last_error TEXT,
    UNIQUE(url, category)
);

CREATE INDEX IF NOT EXISTS idx_frontier_url_status 
    ON frontier_url(processed_at) 
    WHERE processed_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_frontier_url_category 
    ON frontier_url(category);

CREATE TABLE IF NOT EXISTS config_url_log (
    id BIGSERIAL PRIMARY KEY,
    url TEXT NOT NULL,
    category TEXT NOT NULL,
    type INTEGER NOT NULL CHECK (type >= 0 AND type <= 2),
    last_checked_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    success_count INTEGER DEFAULT 0,
    failure_count INTEGER DEFAULT 0,
    last_status TEXT,
    UNIQUE(url, category)
);

CREATE INDEX IF NOT EXISTS idx_config_url_log_category 
    ON config_url_log(category);
"""

queries_sql = """
-- name: insert_frontier_url<!
INSERT INTO frontier_url (url, category, type, depth, is_target)
VALUES (:url, :category, :type_, :depth, :is_target)
ON CONFLICT (url, category) DO NOTHING
RETURNING id;

-- name: get_unprocessed_urls
SELECT id, url, category, type, depth, is_target 
FROM frontier_url 
WHERE processed_at IS NULL 
ORDER BY created_at ASC 
LIMIT :limit;

-- name: mark_url_processed!
UPDATE frontier_url 
SET processed_at = CURRENT_TIMESTAMP,
    failed_attempts = CASE WHEN :success THEN failed_attempts 
                         ELSE failed_attempts + 1 END,
    last_error = CASE WHEN :success THEN NULL ELSE :error_message END
WHERE id = :url_id;

-- name: update_config_url_log!
INSERT INTO config_url_log (
    url, category, type, last_checked_at, 
    success_count, failure_count, last_status
)
VALUES (
    :url, :category, :type_, CURRENT_TIMESTAMP, 
    CASE WHEN :success THEN 1 ELSE 0 END,
    CASE WHEN :success THEN 0 ELSE 1 END,
    :status
)
ON CONFLICT (url, category) 
DO UPDATE SET
    last_checked_at = CURRENT_TIMESTAMP,
    success_count = CASE WHEN :success 
                       THEN config_url_log.success_count + 1 
                       ELSE config_url_log.success_count END,
    failure_count = CASE WHEN :success 
                       THEN config_url_log.failure_count 
                       ELSE config_url_log.failure_count + 1 END,
    last_status = :status;
"""

# Scrivi i file SQL
with open(sql_dir / 'schema.sql', 'w') as f:
    f.write(schema_sql)

with open(sql_dir / 'queries.sql', 'w') as f:
    f.write(queries_sql)

print("✅ File SQL creati/aggiornati")

✅ File SQL creati/aggiornati


In [4]:
# Test DatabaseManager
from crawler.database import DatabaseManager

async def test_database_manager():
    db = DatabaseManager()
    try:
        # Inizializza il database
        await db.initialize()
        
        # Test inserimento URL
        test_url = "https://webapps.unito.it/concorsiweb/visualizzaperweb.php?tipo=25&p=y&C7=all&criterio35=A&criterio46=No"
        test_category = "Torino"
        
        # Store URL
        url_id = await db.store_frontier_url(
            url=test_url,
            category=test_category,
            type_=1
        )
        print(f"✅ URL stored with ID: {url_id}")
        
        # Get unprocessed URLs
        urls = await db.get_unprocessed_frontier_urls(1)
        print(f"✅ Retrieved {len(urls)} unprocessed URLs")
        
        # Mark as processed
        if urls:
            await db.mark_url_processed(urls[0]['id'])
            print("✅ URL marked as processed")
        
        return True
    except Exception as e:
        print(f"❌ Test failed: {e}")
        return False
    finally:
        if hasattr(db, 'pool') and db.pool:
            await db.pool.close()
            print("✅ Database pool closed")

# Esegui test DatabaseManager
await test_database_manager()

  logfire.info("SQL queries loaded successfully")


✅ URL stored with ID: 1
✅ Retrieved 1 unprocessed URLs
✅ URL marked as processed
✅ Database pool closed


True

In [5]:
# develop.ipynb

# %% [markdown]
# # Crawler Development and Testing
# Let's test each component step by step

# %% [markdown]
# ## 1. Environment Setup and Imports

# %%
import asyncio
import yaml
from pathlib import Path
import logfire
from crawler.database import db_manager
from crawler.utils.logging_utils import setup_logging
from crawler.utils.playwright_utils import PlaywrightPageManager
from crawler.items import UrlItem, ConfigUrlLogItem

# Setup logging
setup_logging()

In [6]:
# %% [markdown]
# ## 2. Config Loading and Validation

# %%
def load_config(config_path: str = "config/crawler_config.yaml") -> dict:
    with open(config_path) as f:
        return yaml.safe_load(f)

# Test config loading
config = load_config()
print("Categories found:", [cat["name"] for cat in config["categories"]])

Categories found: ['Bologna', 'Torino']


In [7]:
# %% [markdown]
# ## 3. URL Extraction by Type

# %%
def extract_urls_by_type(config: dict, type_: int) -> list:
    """Extract URLs of specific type from config"""
    urls = []
    for category in config["categories"]:
        category_name = category["name"]
        for url_config in category["urls"]:
            if url_config["type"] == type_:
                urls.append({
                    "url": url_config["url"],
                    "category": category_name,
                    "type": url_config["type"],
                    "target_patterns": url_config["target_patterns"],
                    "seed_pattern": url_config.get("seed_pattern"),
                    "max_depth": url_config.get("max_depth", 0)
                })
    return urls

# Test URL extraction for each type
for type_ in range(3):
    urls = extract_urls_by_type(config, type_)
    print(f"\nType {type_} URLs: {len(urls)}")
    for url in urls:
        print(f"- {url['url']} ({url['category']})")



Type 0 URLs: 0

Type 1 URLs: 2
- https://webapps.unito.it/concorsiweb/visualizzaperweb.php?tipo=25&p=y&C7=all&criterio35=A&criterio46=No (Torino)
- https://webapps.unito.it/concorsiweb/visualizzaperweb.php?tipo=24&p=y&C7=all&criterio35=A&criterio46=No (Torino)

Type 2 URLs: 3
- https://bandi.unibo.it/agevolazioni/premi-laurea (Bologna)
- https://bandi.unibo.it/agevolazioni/borse?b_start:int=20 (Bologna)
- https://bandi.unibo.it/agevolazioni/borse (Bologna)


In [None]:





# %% [markdown]
# ## 4. Database Connection Test

# %%
async def test_database():
    """Test database connection and basic operations"""
    try:
        await db_manager.initialize()
        print("Database initialized successfully")
        
        # Test storing a URL
        test_url = "https://test.com/example"
        url_id = await db_manager.store_frontier_url(
            url=test_url,
            category="test",
            type_=1,
            depth=0,
            is_target=True
        )
        print(f"Stored test URL with ID: {url_id}")
        
        # Test config URL log
        await db_manager.update_config_url_log(
            url=test_url,
            category="test",
            type_=1,
            success=True,
            status="test_completed"
        )
        print("Updated config URL log successfully")
        
    except Exception as e:
        print(f"Database test failed: {e}")
    finally:
        await db_manager.close()
        print("Database connection closed")

# Run database test
await test_database()

# %% [markdown]
# ## 5. Playwright Utils Test

# %%
from playwright.async_api import async_playwright

async def test_playwright_utils(test_url: str = "https://example.com"):
    """Test PlaywrightPageManager functionality"""
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        page = await browser.new_page()
        
        # Initialize page manager
        manager = PlaywrightPageManager(page)
        
        try:
            # Test basic navigation
            await page.goto(test_url)
            await manager.initialize_page()
            
            # Test link extraction
            links = await manager.extract_links()
            print(f"Found {len(links)} links on {test_url}")
            
            # Test pattern-based extraction
            pdf_links = await manager.extract_links(r".*\.pdf$")
            print(f"Found {len(pdf_links)} PDF links")
            
        finally:
            await manager.cleanup()
            await browser.close()

# Run Playwright test
await test_playwright_utils()

# %% [markdown]
# ## 6. Test Type 0 Processing

# %%
async def test_type_0_processing():
    """Test processing of Type 0 URLs"""
    # Get Type 0 URLs from config
    type_0_urls = extract_urls_by_type(config, 0)
    
    try:
        await db_manager.initialize()
        
        for url_config in type_0_urls:
            # Create URL item
            item = UrlItem(
                url=url_config["url"],
                category=url_config["category"],
                type=0,
                depth=0,
                is_target=True,
                found_on=None
            )
            
            # Store in database
            url_id = await db_manager.store_frontier_url(
                url=item["url"],
                category=item["category"],
                type_=item["type"],
                depth=item["depth"],
                is_target=item["is_target"]
            )
            
            print(f"Processed Type 0 URL: {item['url']}")
            
    finally:
        await db_manager.close()

# Run Type 0 test
await test_type_0_processing()

# %% [markdown]
# ## 7. Test Type 1 Processing

# %%
async def test_type_1_processing():
    """Test processing of a single Type 1 URL"""
    # Get first Type 1 URL from config
    type_1_urls = extract_urls_by_type(config, 1)
    if not type_1_urls:
        print("No Type 1 URLs found in config")
        return
    
    test_config = type_1_urls[0]
    
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        page = await browser.new_page()
        manager = PlaywrightPageManager(page)
        
        try:
            await db_manager.initialize()
            
            # Navigate to URL
            await page.goto(test_config["url"])
            await manager.initialize_page()
            
            # Extract target URLs
            target_urls = set()
            for pattern in test_config["target_patterns"]:
                urls = await manager.extract_links(pattern)
                target_urls.update(urls)
            
            print(f"Found {len(target_urls)} target URLs")
            
            # Store first 3 URLs as example
            for url in list(target_urls)[:3]:
                item = UrlItem(
                    url=url,
                    category=test_config["category"],
                    type=1,
                    depth=0,
                    is_target=True,
                    found_on=test_config["url"]
                )
                
                await db_manager.store_frontier_url(
                    url=item["url"],
                    category=item["category"],
                    type_=item["type"],
                    depth=item["depth"],
                    is_target=item["is_target"]
                )
                
                print(f"Stored target URL: {url}")
                
        finally:
            await manager.cleanup()
            await browser.close()
            await db_manager.close()

# Run Type 1 test
await test_type_1_processing()

# %% [markdown]
# ## 8. Full Spider Test

# %%
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from crawler.spiders.frontier_spider import FrontierSpider

def test_spider():
    """Run the full spider"""
    process = CrawlerProcess(get_project_settings())
    process.crawl(FrontierSpider)
    process.start()

# Uncomment to run full spider test
# test_spider()