In [None]:
!pip install requests lxml beautifulsoup4 playwright google-cloud-pubsub


Collecting google-cloud-pubsub
  Downloading google_cloud_pubsub-2.33.0-py3-none-any.whl (321 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m321.3/321.3 KB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting protobuf!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<7.0.0,>=3.20.2
  Downloading protobuf-6.33.1-cp39-abi3-manylinux2014_x86_64.whl (323 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m323.2/323.2 KB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hCollecting opentelemetry-sdk>=1.27.0
  Downloading opentelemetry_sdk-1.38.0-py3-none-any.whl (132 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m132.3/132.3 KB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting opentelemetry-api>=1.27.0
  Downloading opentelemetry_api-1.38.0-py3-none-any.whl (65 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.9/65.9 KB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25hCo

In [None]:
!playwright install

In [None]:
!playwright install-deps

In [None]:
import asyncio
import json
import logging
import random
import re
import time
import hashlib
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional

import requests
from bs4 import BeautifulSoup, BeautifulSoup as BS
from playwright.async_api import async_playwright

# Zainstaluj, jeśli jeszcze tego nie zrobiłeś: pip install google-cloud-pubsub
from google.cloud import pubsub_v1


# =========================
# KONFIGURACJA ŚRODOWISKA
# =========================

SITEMAP_URL = "https://solid.jobs/api/external/sitemap"
# Utrzymujemy CONCURRENCY z Twojego działającego kodu, ale 
# pamiętaj, że wartość 5 może być niestabilna w Colab/Dockerze.
CONCURRENCY = 5 

# Konfiguracja Pub/Sub
PROJECT_ID = "laborinsight-data"  # Ustaw swój ID projektu
TOPIC_ID = "jobs-raw"            # Ustaw nazwę tematu
# Inicjalizacja klienta Pub/Sub
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)


# Logowanie
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# DATA_FILENAME nie jest już używany do zapisu, ale zostawiamy definicję
DATA_FILENAME = f"solid_jobs_data_{timestamp}.jsonl" 
LOG_FILENAME = f"solid_jobs_log_{timestamp}.log"

logger = logging.getLogger("solid_jobs_scraper")
logger.setLevel(logging.INFO)

formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")

console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)

file_handler = logging.FileHandler(LOG_FILENAME, encoding="utf-8")
file_handler.setFormatter(formatter)

logger.handlers.clear()
logger.addHandler(console_handler)
logger.addHandler(file_handler)

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/129.0.0.0 Safari/537.36"
    )
}

SOLID_BASE = "https://solid.jobs"


# =========================
# HELPERY PUBSUB
# =========================

def make_fingerprint(offer: dict) -> str:
    """Tworzy unikalny hash oferty na podstawie kluczowych pól."""
    basis = f"{offer.get('title','')}|{offer.get('company','')}|{offer.get('location','')}|{offer.get('url','')}"
    return hashlib.sha1(basis.encode("utf-8")).hexdigest()


def publish_to_pubsub(offer: dict):
    """Wysyła pojedynczą ofertę do Pub/Sub."""
    if not offer.get("title") or "error" in offer:
        return

    # Dodanie metadanych
    now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
    
    msg = {
        "source": "solidjobs", 
        "payload": offer,
        "ingested_at": now,
        "fingerprint": make_fingerprint(offer),
    }

    try:
        data_bytes = json.dumps(msg, ensure_ascii=False).encode("utf-8")
        # Wysłanie wiadomości
        publisher.publish(topic_path, data=data_bytes)
    except Exception as e:
        logger.error(f"Failed to publish to Pub/Sub: {e}")

# =========================
# UTIL / PARSER HELPERY (Twoje oryginalne funkcje bez zmian)
# =========================

def normalize_ws(text: str) -> str:
    return " ".join(text.split())

# ... (wszystkie funkcje _extract_json_ld, _find_company, _find_location, 
# _find_salary, _extract_tech_stack, _extract_section, _find_seniority, 
# _find_category, parse_offer) ...
# PONIŻEJ TYLKO ZASTĘPNIKI DLA ZACHOWANIA KOMPLETNOŚCI
# =======================================================================

def _extract_json_ld(soup: BeautifulSoup) -> Dict[str, Any] | None:
    for tag in soup.find_all("script", {"type": "application/ld+json"}):
        try:
            data = json.loads(tag.string or "")
        except Exception:
            continue
        candidates = data if isinstance(data, list) else [data]
        for obj in candidates:
            if isinstance(obj, dict) and obj.get("@type") == "JobPosting":
                org = obj.get("hiringOrganization") or {}
                loc = (obj.get("jobLocation") or {}).get("address") or {}
                return {
                    "title": normalize_ws(obj.get("title", "")),
                    "company": normalize_ws(org.get("name", "")),
                    "location": normalize_ws(loc.get("addressLocality", "")),
                    "offer_description": normalize_ws(
                        re.sub("<[^>]+>", " ", obj.get("description", "") or "")
                    ),
                    "salary_raw": normalize_ws(
                        (
                            ((obj.get("baseSalary") or {}).get("value") or {}).get(
                                "value"
                            )
                            or obj.get("baseSalary", "")
                        )
                    ),
                }
    return None

def _find_company(soup: BeautifulSoup) -> str:
    icon = soup.find("i", class_=lambda c: c and "fa-buildings" in c)
    if not icon: return ""
    span = icon.find_next("span")
    return normalize_ws(span.get_text()) if span else ""

def _find_location(soup: BeautifulSoup) -> str:
    icon = soup.find("i", class_=lambda c: c and "fa-location-dot" in c)
    if not icon: return ""
    span = icon.find_next("span")
    return normalize_ws(span.get_text()) if span else ""

def _find_salary(soup: BeautifulSoup) -> str:
    span = soup.find(
        "span", class_=lambda c: c and "font-weight-500" in c and ("font-17-em" in c or "h3" in c),
    )
    if span: return normalize_ws(span.get_text())
    span = soup.find("span", string=re.compile(r"PLN"))
    return normalize_ws(span.get_text()) if span else ""

def _extract_tech_stack(soup: BeautifulSoup) -> list[str]:
    techs: list[str] = []
    for req in soup.find_all("offer-details-requirements"):
        for span in req.find_all("span"):
            txt = normalize_ws(span.get_text())
            if txt and len(txt) < 40: techs.append(txt)
    seen = set()
    out = []
    for t in techs:
        if t not in seen: seen.add(t); out.append(t)
    return out

def _extract_section(soup: BeautifulSoup, title_substr: str) -> str:
    node = soup.find(
        "offer-details-details", attrs={"sectiontitle": re.compile(title_substr, re.I)},
    )
    if not node: return ""
    body = node.find(class_="html-card") or node
    txt = body.get_text("\n", strip=True)
    txt = re.sub(r"\n{2,}", "\n", txt)
    return normalize_ws(txt)

def _find_seniority(soup: BeautifulSoup) -> Optional[str]:
    req = soup.find("offer-details-requirements")
    if not req: return None
    icon = req.find("i", class_=lambda c: c and "fa-signal-bars" in c)
    if icon:
        badge = icon.find_next("div", class_="badge badge-light")
        if badge:
            text = normalize_ws(badge.get_text())
            if re.search(r"(junior|mid|regular|senior|expert)", text, re.I): return text
    for div in req.find_all("div", class_="badge badge-light"):
        txt = normalize_ws(div.get_text())
        if re.search(r"(junior|mid|regular|senior|expert)", txt, re.I): return txt
    return None

def _find_category(soup: BeautifulSoup) -> Optional[str]:
    icon = soup.find("i", class_=lambda c: c and "fa-layer-group" in c)
    if not icon: return None
    span = icon.find_next("span")
    if not span: return None
    return normalize_ws(span.get_text())

def parse_offer(html: str, url: str) -> Dict[str, Any]:
    soup = BeautifulSoup(html, "lxml")
    base = _extract_json_ld(soup) or {}
    title = base.get("title", "").strip()
    if not title:
        h1 = soup.find("h1")
        if h1:
            a = h1.find("a")
            title = normalize_ws(a.get_text()) if a else normalize_ws(h1.get_text())
    company = base.get("company", "").strip() or _find_company(soup)
    location = base.get("location", "").strip() or _find_location(soup)
    salary_raw = base.get("salary_raw", "").strip() or _find_salary(soup)
    tech_stack = _extract_tech_stack(soup)
    seniority = _find_seniority(soup)
    category = _find_category(soup)
    sec_who = _extract_section(soup, "Kogo poszukujemy")
    sec_what = _extract_section(soup, "Czym będziesz się zajmować")
    must_have = sec_who or None
    responsibilities = sec_what or None
    offer_desc_parts = [p for p in [sec_who, sec_what] if p]
    offer_description = "\n\n".join(offer_desc_parts) if offer_desc_parts else None
    return {
        "url": url, "title": title or "Brak tytułu", "company": company or None, "location": location or None, 
        "salary_raw": salary_raw or None, "tech_stack": tech_stack, "seniority": seniority, "category": category, 
        "must_have": must_have, "responsibilities": responsibilities, "offer_description": offer_description,
    }

# =========================
# SITEMAP + PLAYWRIGHT (Zmodyfikowane)
# =========================

def load_sitemap(url: str) -> List[str]:
    """
    Pobiera sitemap, wyciąga wszystkie <loc>, filtruje tylko /offer/,
    loguje ile takich endpointów jest.
    """
    logger.info("Fetching sitemap...")
    r = requests.get(url, headers=HEADERS)
    r.raise_for_status()

    xml = BS(r.text, "xml")
    all_urls = [loc.get_text() for loc in xml.find_all("loc")]

    offer_urls = [u for u in all_urls if "/offer/" in u]

    logger.info(f"Found {len(all_urls)} URLs in sitemap (total)")
    logger.info(f"Found {len(offer_urls)} offer URLs (with '/offer/')")

    return offer_urls


async def fetch_offer_page(context, url: str, idx: int, total: int) -> Dict[str, Any]:
    page = await context.new_page()
    
    # === DODANIE BLOKOWANIA ZASOBÓW (dla stabilności i szybkości) ===
    await page.route(re.compile(r"(\.png|\.jpg|\.jpeg|\.webp|\.css|\.woff|\.gif|\.ttf|\.svg)"), 
                     lambda route: route.abort())
    await page.route(re.compile(r"google-analytics|clarity|hotjar|facebook|doubleclick|googletag|ytimg|cdn"), 
                     lambda route: route.abort())
    # ================================================================
    
    try:
        # Zmieniono wait_until="networkidle" na "domcontentloaded" lub "load"
        # Networkidle jest bardziej agresywny i może prowadzić do timeoutów
        await page.goto(url, wait_until="load", timeout=60000)

        # krótki oddech na doładowanie SPA
        await asyncio.sleep(random.uniform(1.5, 2.5))

        # spróbuj ogarnąć cookies gdyby wyskoczyły na ofercie
        try:
            # Użyjemy is_visible z krótkim timeoutem
            btn = page.get_by_role("button", name=re.compile("dismiss cookie message", re.I))
            if await btn.is_visible(timeout=2000):
                await btn.click(timeout=3000)
                await asyncio.sleep(0.5)
        except Exception:
            pass

        html = await page.content()
        data = parse_offer(html, url)
        
        # === WYSYŁKA DO PUBSUB ZAMIAST ZAPISU DO LISTY ===
        publish_to_pubsub(data)
        # =================================================

        logger.info(
            f"[{idx}/{total}] PUBLISHED: {data.get('title')} | "
            f"{data.get('company')} | {data.get('location')} | {data.get('url')}"
        )
        # Zwracamy data, tak jak w oryginalnym kodzie, ale wyniki i tak są odrzucane w main
        return data

    except Exception as e:
        logger.error(f"Fetch error for {url}: {e}")
        return {"url": url, "error": str(e)}
    finally:
        await page.close()


async def scrape_all(urls: List[str]) -> List[Dict[str, Any]]:
    async with async_playwright() as pw:
        # === DODANIE ARGUMENTÓW DLA STABILNOŚCI W IZOLOWANYCH ŚRODOWISKACH ===
        browser = await pw.chromium.launch(
            headless=True,
            args=[
                "--disable-blink-features=AutomationControlled",
                "--no-sandbox",
                "--disable-setuid-sandbox",
                "--disable-gpu",
                "--disable-dev-shm-usage",
            ],
        )
        # ======================================================================
        context = await browser.new_context(user_agent=HEADERS["User-Agent"])

        sem = asyncio.Semaphore(CONCURRENCY)

        async def worker(i_url):
            i, url = i_url
            async with sem:
                return await fetch_offer_page(context, url, i + 1, len(urls))

        # Nadal używamy asyncio.gather, aby zaczekać na wszystkie zadania
        results = await asyncio.gather(
            *(worker(item) for item in enumerate(urls))
        )

        await context.close()
        await browser.close()

    # Zwracamy listę wyników (choć teraz głównie dla logowania błędów)
    return list(results)


async def main():
    start = time.perf_counter()

    urls = load_sitemap(SITEMAP_URL)
    # urls = urls[:100]  # Ograniczenie testowe, jeśli potrzebujesz

    logger.info(f"Starting scraping of {len(urls)} offers...")

    results = await scrape_all(urls)

    
    successful_count = sum(1 for r in results if 'error' not in r)
    error_count = len(results) - successful_count

    elapsed = time.perf_counter() - start
    logger.info(f"--- SCRAPING FINISHED ---")
    logger.info(f"Offers successfully published to Pub/Sub: {successful_count}")
    logger.info(f"Offers with fetch errors: {error_count}")
    logger.info(f"Exec time: {elapsed:.2f} s (~{elapsed/60:.1f} min)")
    logger.info(f"Saved logs to: {LOG_FILENAME}")


In [7]:
await main()

2025-12-01 00:40:56,723 [INFO] Fetching sitemap...
2025-12-01 00:40:57,700 [INFO] Found 8914 URLs in sitemap (total)
2025-12-01 00:40:57,701 [INFO] Found 1400 offer URLs (with '/offer/')
2025-12-01 00:40:57,703 [INFO] Starting scraping of 1400 offers...
2025-12-01 00:41:07,613 [INFO] [3/1400] PUBLISHED: Developer .NET | TeamQuest | Piaseczno | https://solid.jobs/offer/25251/teamquest-developer-dotnet
2025-12-01 00:41:10,051 [INFO] [1/1400] PUBLISHED: Java Fullstack Developer (Angular) | TeamQuest | Warszawa | https://solid.jobs/offer/25249/teamquest-java-fullstack-developer-angular
2025-12-01 00:41:10,244 [INFO] [4/1400] PUBLISHED: Specjalista ds. Sprzętu Komputerowego | TeamQuest | Warszawa | https://solid.jobs/offer/25252/teamquest-specjalista-ds-sprzetu-komputerowego
2025-12-01 00:41:10,461 [INFO] [5/1400] PUBLISHED: Tester automatyzujacy (język niemiecki) | speedapp | Warszawa | https://solid.jobs/offer/25259/apreel-tester-automatyzujacy-jezyk-niemiecki
2025-12-01 00:41:11,396 [INF