# TUHH Institutes Scraper

Scrape all institute entries from the TUHH overview page and export a CSV with columns: `name`, `id`, `url`.

- Source: https://www.tuhh.de/tuhh/dekanate/institute-im-ueberblick
- The institute name is the text before the parentheses.
- The institute ID is the letter/number code in parentheses, e.g., `(E-11)` or `(M-EXK6)`.
- The website URL is the link embedded in each box.


In [21]:
# Imports
import re
import time
import random
import logging
import asyncio
import threading
from queue import Queue
from typing import List, Dict

import requests
from bs4 import BeautifulSoup
import pandas as pd
from urllib.parse import urljoin

from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

# Optional: Playwright rendering fallback for JS-loaded content
try:
    from playwright.async_api import async_playwright  # type: ignore
    HAS_PLAYWRIGHT = True
except Exception:
    HAS_PLAYWRIGHT = False

# Optional: allow nested asyncio in notebooks
try:
    import nest_asyncio  # type: ignore
    nest_asyncio.apply()
except Exception:
    pass

print("✅ Imports ready (Playwright:", "available" if HAS_PLAYWRIGHT else "not available", ")")

✅ Imports ready (Playwright: available )


In [27]:
# Scraper implementation
class TUHHInstitutesScraper:
    def __init__(self, base_url: str = "https://www.tuhh.de", min_delay: float = 0.2, max_delay: float = 0.6, retry_attempts: int = 3, use_playwright: bool = True):
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()
        retry = Retry(total=retry_attempts, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "HEAD", "OPTIONS"], backoff_factor=0.8)
        adapter = HTTPAdapter(max_retries=retry)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
        self.min_delay = min_delay
        self.max_delay = max_delay
        self.use_playwright = use_playwright and HAS_PLAYWRIGHT
        self.logger = logging.getLogger("tuhh-scraper")
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
            self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def _clean_text(self, s: str) -> str:
        return re.sub(r"\s+", " ", s or "").strip()

    def _extract_id_from_text(self, text: str) -> str | None:
        # Return the LAST matching ID in the string to avoid picking IDs from sibling tiles
        matches = re.findall(r"\(([A-Za-zÄÖÜ][A-Za-zÄÖÜ\-]*-[A-Za-z0-9\-]+)\)", text or "")
        return matches[-1].strip() if matches else None

    def _make_full_url(self, href: str) -> str:
        return href if href.startswith('http') else urljoin(self.base_url, href)

    def _valid_url(self, u: str) -> bool:
        return bool(u and u.startswith(('http://','https://')) and len(u) > len('http://'))

    async def _best_anchor_in_container(self, frame, a):
        # Prefer a heading anchor within the same tile/card
        try:
            href = await a.get_attribute('href')
            text = self._clean_text(await a.inner_text())
            if text and self._extract_id_from_text(text):
                return a  # this anchor already carries the label and id
            # else look for heading anchors
            heading_anchor = await a.evaluate_handle('''(el) => {
                const c = el.closest('div,li,section,article');
                if(!c) return null;
                return c.querySelector('h2 a, h3 a, h4 a, strong a, a');
            }''')
            if heading_anchor:
                return heading_anchor.as_element()
        except Exception:
            pass
        return a

    async def _collect_from_frame(self, frame) -> list[dict]:
        rows: list[dict] = []
        try:
            anchors = await frame.query_selector_all("main a[href], a[href]")
            seen = set()
            for a in anchors:
                try:
                    a = await self._best_anchor_in_container(frame, a)
                    href = await a.get_attribute('href')
                    if not href or href.startswith('#') or href.startswith('mailto:'):
                        continue
                    full = self._make_full_url(href)
                    if full in seen:
                        continue
                    text = self._clean_text(await a.inner_text())
                    # Pull container text for fallback
                    container_text = text
                    try:
                        container_text = self._clean_text(await a.evaluate('(el) => el.closest("div,li,section,article")?.innerText || el.parentElement?.innerText || el.innerText'))
                    except Exception:
                        pass
                    # Prefer ID from anchor text; fallback to container
                    inst_id = self._extract_id_from_text(text) or self._extract_id_from_text(container_text)
                    if not inst_id:
                        continue
                    # Derive name from the most specific text (anchor or heading in container)
                    label_text = text
                    if not label_text or len(label_text) < 3 or inst_id not in label_text:
                        try:
                            label_text = self._clean_text(await a.evaluate('(el) => { const c = el.closest("div,li,section,article"); if(!c) return ""; const h = c.querySelector("h2,h3,h4,strong"); return h ? h.innerText : el.innerText }'))
                        except Exception:
                            label_text = container_text
                    name_text = self._clean_text(label_text.replace(f"({inst_id})", ""))
                    if len(name_text) < 2:
                        continue
                    if not self._valid_url(full):
                        continue
                    rows.append({"name": name_text, "id": inst_id, "url": full})
                    seen.add(full)
                except Exception:
                    continue
        except Exception:
            pass
        return rows

    async def _scrape_async(self, url: str) -> list[dict]:
        rows: list[dict] = []
        try:
            async with async_playwright() as p:  # type: ignore
                browser = await p.chromium.launch()
                ctx = await browser.new_context()
                page = await ctx.new_page()
                await page.goto(url, wait_until='domcontentloaded', timeout=45000)
                # Accept cookies if present
                for selector in [
                    'button:has-text("AKZEPTIEREN")',
                    'button:has-text("Accept")',
                    "#usercentrics-root button[aria-label='Akzeptieren']",
                    "#usercentrics-root button[aria-label='Accept']",
                    "#uc-btn-accept-all",
                ]:
                    try:
                        await page.locator(selector).click(timeout=1500)
                        break
                    except Exception:
                        pass
                # Try to open/activate the alphabetical section/tab
                for tab_text in ["alphabetisch", "alphabetic", "Alphabetisch", "Alphabetical"]:
                    try:
                        await page.locator(f"text={tab_text}").first.click(timeout=2000)
                        break
                    except Exception:
                        pass
                try:
                    await page.wait_for_selector("main", timeout=15000)
                except Exception:
                    pass
                # Scroll to trigger any lazy content
                for _ in range(3):
                    try:
                        await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
                        await page.wait_for_timeout(300)
                    except Exception:
                        break
                # Collect from page and frames
                all_frames = [page] + page.frames
                collected: list[dict] = []
                for fr in all_frames:
                    try:
                        part = await self._collect_from_frame(fr)
                        collected.extend(part)
                    except Exception:
                        continue
                # Unique by (name,id,url)
                seen = set()
                for r in collected:
                    key = (r['name'], r['id'], r['url'])
                    if key in seen:
                        continue
                    seen.add(key)
                    rows.append(r)
                await page.close(); await ctx.close(); await browser.close()
        except Exception as e:
            self.logger.warning(f"Async Playwright extraction failed: {e}")
        return rows

    def _scrape_via_playwright(self, url: str) -> list[dict]:
        if not self.use_playwright:
            return []
        try:
            return asyncio.get_event_loop().run_until_complete(self._scrape_async(url))
        except RuntimeError:
            loop = asyncio.new_event_loop()
            try:
                return loop.run_until_complete(self._scrape_async(url))
            finally:
                loop.close()
        except Exception as e:
            self.logger.warning(f"Playwright extraction failed: {e}")
            return []

    def _request_soup(self, url: str) -> BeautifulSoup | None:
        try:
            time.sleep(random.uniform(self.min_delay, self.max_delay))
            headers = {
                "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0 Safari/537.36",
                "Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
            }
            r = self.session.get(url, headers=headers, timeout=25, allow_redirects=True)
            if r.status_code == 200 and r.text:
                return BeautifulSoup(r.text, "html.parser")
            self.logger.warning(f"Non-200 for {url}: {r.status_code}")
        except Exception as e:
            self.logger.warning(f"Request error for {url}: {e}")
        return None

    def get_page(self, url: str) -> BeautifulSoup | None:
        if self.use_playwright:
            html_rows = self._scrape_via_playwright(url)
            if len(html_rows) >= 10:
                self._playwright_rows_cache = html_rows
                return BeautifulSoup("<html><body><main id='pw-ok'></main></body></html>", 'html.parser')
        return self._request_soup(url)

    def parse_boxes(self, soup: BeautifulSoup) -> List[Dict[str, str]]:
        cached = getattr(self, '_playwright_rows_cache', None)
        if isinstance(cached, list) and cached:
            return cached
        # Static fallback
        rows: List[Dict[str, str]] = []
        if not soup:
            return rows
        root = soup.find('main') or soup
        items = root.select('li, div, a')
        seen = set()
        for node in items:
            text = self._clean_text(node.get_text(' ', strip=True))
            inst_id = self._extract_id_from_text(text)
            if not inst_id:
                continue
            a = node if node.name == 'a' and node.get('href') else node.find('a', href=True)
            if not a:
                continue
            full = self._make_full_url(a['href'])
            if not self._valid_url(full) or full in seen:
                continue
            # Name from node text
            name_text = self._clean_text(text.replace(f"({inst_id})", ""))
            if len(name_text) < 2:
                name_text = self._clean_text(a.get_text(' ', strip=True))
                name_text = self._clean_text(name_text.replace(f"({inst_id})", ""))
            if len(name_text) < 2:
                continue
            rows.append({"name": name_text, "id": inst_id, "url": full})
            seen.add(full)
        return rows

    def scrape(self, overview_url: str) -> List[Dict[str, str]]:
        soup = self.get_page(overview_url)
        rows = self.parse_boxes(soup) if soup else []
        self.logger.info(f"Collected {len(rows)} rows from overview (via Playwright if available)")
        # Final unique by URL
        seen = set(); out = []
        for r in rows:
            if r['url'] in seen:
                continue
            seen.add(r['url']); out.append(r)
        return out

print("✅ TUHHInstitutesScraper updated: correct ID extraction (anchor-first, last-match) + URL validation")

✅ TUHHInstitutesScraper updated: correct ID extraction (anchor-first, last-match) + URL validation


In [28]:
# Run and save CSV
from datetime import datetime

overview_url = "https://www.tuhh.de/tuhh/dekanate/institute-im-ueberblick"
scraper = TUHHInstitutesScraper()
rows = scraper.scrape(overview_url)
print(f"Found {len(rows)} institutes")

if rows:
    df = pd.DataFrame(rows)
    df = df.drop_duplicates(subset=["url"]).reset_index(drop=True)
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    out = f"data/tuhh_institutes_{ts}.csv"
    # ensure folder exists
    import os
    os.makedirs("data", exist_ok=True)
    df.to_csv(out, index=False)
    print(f"Saved {len(df)} rows to {out}")
else:
    print("No rows scraped")

2025-09-28 17:05:19,298 - INFO - Collected 96 rows from overview (via Playwright if available)


Found 96 institutes
Saved 96 rows to data/tuhh_institutes_20250928_170519.csv


## How to run

1. Run cell 2 (Imports) and cell 3 (Scraper implementation).
2. Run cell 4 to scrape and write `data/tuhh_institutes_YYYYMMDD_HHMMSS.csv`.

The CSV will contain columns `name`, `id`, `url`.