<h3> imports <h3>

In [169]:
import os, time, threading, random, re, urllib.parse as urlparse, yaml, requests, pandas as pd
from collections import defaultdict
from urllib.robotparser import RobotFileParser
from bs4 import BeautifulSoup
from hashlib import sha1
import tldextract
from datetime import datetime
from readability import Document
import feedparser

<h1> 1.	Crawler / Scraper <h1>

Defining basic settings

In [170]:
# os.makedirs("configs", exist_ok=True)
# os.makedirs("storage/raw", exist_ok=True)
# os.makedirs("storage/clean", exist_ok=True)
# os.makedirs("logs", exist_ok=True)

# CONFIG = {
#     "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
#     "default_timeout_sec": 15,
#     "per_host_min_delay_sec": 2,
#     "max_retries": 3,
#     "sources": {
#         "reddit": {
#             "type": "reddit_html",
#             "subreddits": ["worldnews", "news"],
#             "limit_per_sub": 0
#         },
#         "news_sites": [
#             {
#                 "name": "reuters_world",
#                 "start_urls": ["https://www.reuters.com/world/"],
#                 "article_selector_hint": "a"
#             }
#         ]
#     }
# }

# with open("configs/crawl_config.yaml", "w", encoding="utf-8") as f:
#     yaml.safe_dump(CONFIG, f, allow_unicode=True)
# print("configs/crawl_config.yaml saved")


In [171]:
import os, yaml

os.makedirs("configs", exist_ok=True)
os.makedirs("storage/raw", exist_ok=True)
os.makedirs("storage/clean", exist_ok=True)
os.makedirs("logs", exist_ok=True)

CONFIG = {
    "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
    "default_timeout_sec": 15,
    "per_host_min_delay_sec": 2,
    "max_retries": 3,
    "sources": {
        "reddit": {
            "type": "reddit_html",
            "subreddits": [
                "worldnews",
                "news",
                "politics",
                "technology",
                "science",
                "environment",
                "economics"
            ],
            "limit_per_sub": 50
        },
        "npr_news": {
            "type": "news_html",
            "start_urls": ["https://www.npr.org/sections/world/"],
            "article_selector_hint": "a",
            "include_patterns": ["/story/", "/202", "/sections/world/"],
            "limit_per_section": 50
        },
        "news_sites": [
            {
                "name": "ap_world",
                "start_urls": ["https://apnews.com/hub/world-news"],
                "article_selector_hint": "a"
            }
        ]
        # "news_sites": [
        #     {
        #         "name": "reuters_world",
        #         "start_urls": ["https://www.reuters.com/world/"],
        #         "article_selector_hint": "a"
        #     }
        # ]
    }
}

with open("configs/crawl_config.yaml", "w", encoding="utf-8") as f:
    yaml.safe_dump(CONFIG, f, allow_unicode=True)
print("configs/crawl_config.yaml saved")


configs/crawl_config.yaml saved


Review the robots.txt and control the rate

In [172]:
_session = requests.Session()
# _session.headers.update({"Accept-Language": "en;q=0.9"})
_session.headers.update({
    "Accept-Language": "en;q=0.9",
    "User-Agent": CONFIG["user_agent"],
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    "Connection": "keep-alive"
})

# ==== robots.txt handling and rate limit ====
_robot_cache = {}
_host_next_time = defaultdict(float)
_lock = threading.Lock()

def get_robots_parser(base_url, ua):
    parsed = urlparse.urlparse(base_url)
    robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
    if robots_url in _robot_cache:
        return _robot_cache[robots_url]
    rp = RobotFileParser()
    try:
        resp = _session.get(robots_url, timeout=8)
        if resp.status_code == 200:
            rp.parse(resp.text.splitlines())
        else:
            rp.parse([])
    except Exception:
        rp.parse([])
    _robot_cache[robots_url] = rp
    return rp

def host_key(url):
    p = urlparse.urlparse(url)
    ext = tldextract.extract(p.netloc)
    return ".".join([x for x in [ext.domain, ext.suffix] if x])

def rate_limit(url, min_delay_sec):
    hk = host_key(url)
    with _lock:
        now = time.time()
        nt = _host_next_time[hk]
        wait = nt - now
        if wait > 0:
            time.sleep(wait)
        _host_next_time[hk] = time.time() + min_delay_sec


Fetch function (with Retry & Exponential Backoff)

In [173]:
def fetch(url, user_agent, timeout=15, max_retries=3, min_delay_sec=2):
    rp = get_robots_parser(url, user_agent)
    if not rp.can_fetch(user_agent, url):
        raise PermissionError(f"Blocked by robots.txt for {url}")

    rate_limit(url, min_delay_sec)

    headers = {"User-Agent": user_agent}
    attempt = 0
    backoff = 1.6

    while attempt <= max_retries:
        try:
            resp = _session.get(url, headers=headers, timeout=timeout, allow_redirects=True)

            if 200 <= resp.status_code < 300:
                return resp

            # محدودیت نرخ و خطای موقت
            if resp.status_code in (429, 502, 503, 504):
                sleep_sec = (backoff ** attempt) + random.uniform(0, 0.5)
                time.sleep(sleep_sec)

            # عدم دسترسی یا محدودیت قانونی
            elif resp.status_code in (401, 403, 451):
                break

            else:
                time.sleep(0.6)

        except requests.RequestException:
            time.sleep((backoff ** attempt) + 0.4)

        attempt += 1

    raise TimeoutError(f"Fetch failed after retries for {url}")


HTML Crawler for Reddit

In [174]:
UPVOTE_RE = re.compile(r"(\d+(?:\.\d+)?)([kK])?\s*upvote")

def to_int_k(v):
    if v is None:
        return None
    s = str(v).strip().lower()
    if s.endswith("k"):
        try:
            return int(float(s[:-1]) * 1000)
        except:
            return None
    try:
        return int(s)
    except:
        return None

def parse_reddit_listing(html, base_url):
    soup = BeautifulSoup(html, "lxml")
    items = []
    for post in soup.select("[data-testid='post-container'], div[data-test-id='post-content']"):
        title_el = post.select_one("h3") or post.select_one("a[data-click-id='body'] h3")
        if not title_el:
            continue
        title = title_el.get_text(strip=True)
        # link_el = post.select_one("a[data-click-id='body']") or post.find("a", href=True)
        link_el = post.select_one("a[data-click-id='body'][href]") or post.select_one("a[href^='/r/']")

        url = urlparse.urljoin(base_url, link_el["href"]) if link_el and link_el.get("href") else None
        if not url:
            continue
        
        author_el = post.select_one("a[data-click-id='user']") or post.select_one("a[href^='/user/']")
        author = author_el.get_text(strip=True) if author_el else None
        time_el = post.select_one("a[data-click-id='timestamp'] time") or post.find("time")
        published_at = time_el.get("datetime") if time_el and time_el.has_attr("datetime") else None
        comments_el = post.select_one("a[data-click-id='comments']") or post.find("a", string=re.compile("comment", re.I))
        comments = None
        if comments_el:
            m = re.search(r"(\d+(?:\.\d+)?[kK]?)", comments_el.get_text(" ", strip=True))
            if m:
                comments = to_int_k(m.group(1))
        score = None
        aria_up = post.find(attrs={"aria-label": re.compile("upvote", re.I)})
        if aria_up:
            m = UPVOTE_RE.search(aria_up.get("aria-label", ""))
            if m:
                val = float(m.group(1))
                if m.group(2):
                    val *= 1000
                score = int(val)
        items.append({
            "source_type": "reddit",
            "source_name": base_url,
            "subreddit": base_url.rstrip("/").split("/")[-1],
            "url": url,
            "canonical_url": url,
            "title": title,
            "text": None,
            "author": author,
            "published_at": published_at,
            "score": score,
            "comments": comments,
            "fetched_at": datetime.utcnow().isoformat(timespec="seconds")
        })
    return items


In [175]:
STRIP_PARAMS = {"utm_source","utm_medium","utm_campaign","utm_term","utm_content","ref","utm_name","gclid","fbclid"}

def normalize_url(u: str) -> str | None:
    if not u:
        return None
    p = urlparse.urlparse(u)
    if p.scheme not in ("http","https"):
        return None
    q = [(k,v) for k,v in urlparse.parse_qsl(p.query, keep_blank_values=True) if k not in STRIP_PARAMS]
    new_q = urlparse.urlencode(q)
    return urlparse.urlunparse((p.scheme, p.netloc, p.path, "", new_q, ""))

def dedupe_records(records):
    seen = set()
    unique = []
    for r in records:
        key = normalize_url(r.get("canonical_url") or r.get("url")) or r.get("url")
        if not key:
            continue
        h = sha1(key.encode("utf-8")).hexdigest()
        if h in seen:
            continue
        seen.add(h)
        r["canonical_url"] = key
        unique.append(r)
    return unique


In [176]:
def reddit_rss_url(sub: str) -> str:
    # RSS پایدار روی www.reddit.com بهتر جواب می‌دهد
    return f"https://www.reddit.com/r/{sub}/.rss"

def crawl_reddit_via_rss(sub, cfg, limit=None):
    rss = reddit_rss_url(sub)
    ua = cfg["user_agent"]

    # robots برای خود مسیر RSS چک شود
    rp = get_robots_parser(rss, ua)
    if not rp.can_fetch(ua, rss):
        print(f"robots disallows RSS for {rss}")
        return []

    # با Session خودمان fetch کنیم تا UA درست ارسال شود
    resp = fetch(rss, ua,
                 timeout=cfg["default_timeout_sec"],
                 max_retries=cfg["max_retries"],
                 min_delay_sec=cfg["per_host_min_delay_sec"])

    print("rss status:", resp.status_code, "bytes:", len(resp.text))

    d = feedparser.parse(resp.text)
    print("rss entries:", len(d.get("entries", [])))
    items = []
    now = datetime.utcnow().isoformat(timespec="seconds")
    for e in d.get("entries", [])[: (limit or 50)]:
        link = e.get("link")
        items.append({
            "source_type": "reddit",
            "source_name": rss,
            "subreddit": sub,
            "url": link,
            "canonical_url": normalize_url(link) if link else link,
            "title": e.get("title"),
            "text": None,
            "author": e.get("author") if "author" in e else None,
            "published_at": e.get("published") if "published" in e else None,
            "score": None,
            "comments": None,
            "fetched_at": now
        })
    return dedupe_records(items)


In [177]:
def crawl_reddit_subreddit(sub, cfg):
    ua = cfg["user_agent"]
    limit = cfg["sources"]["reddit"]["limit_per_sub"]
    html_url = f"https://old.reddit.com/r/{sub}/"
    rp = get_robots_parser(html_url, ua)
    if not rp.can_fetch(ua, html_url):
        print(f"robots disallows HTML for {html_url}, switching to RSS")
        return crawl_reddit_via_rss(sub, cfg, limit)
    # اگر HTML مجاز بود همان مسیر قبلی
    resp = fetch(html_url, ua,
                 timeout=cfg["default_timeout_sec"],
                 max_retries=cfg["max_retries"],
                 min_delay_sec=cfg["per_host_min_delay_sec"])
    items = parse_reddit_listing(resp.text, html_url)
    items = dedupe_records(items)
    return items[:limit] if limit else items


In [178]:
# ==== Article text extraction ====
def extract_article_text(html):
    try:
        doc = Document(html)
        content_html = doc.summary()
        soup = BeautifulSoup(content_html, "lxml")
        text = soup.get_text("\n", strip=True)
        if len(text) < 200:
            soup_full = BeautifulSoup(html, "lxml")
            paras = [p.get_text(" ", strip=True) for p in soup_full.select("p")]
            text = "\n".join(paras[:60])
        return text
    except Exception:
        soup = BeautifulSoup(html, "lxml")
        paras = [p.get_text(" ", strip=True) for p in soup.select("p")]
        return "\n".join(paras[:60])

In [179]:
# ==== Link discovery for news sites ====
def parse_listing_find_links(html, base_url, selector_hint="a"):
    soup = BeautifulSoup(html, "lxml")
    base = urlparse.urlparse(base_url).netloc
    links = []
    for a in soup.select(selector_hint):
        href = a.get("href")
        if not href:
            continue
        full = urlparse.urljoin(base_url, href)
        norm = normalize_url(full)
        if not norm:
            continue
        net = urlparse.urlparse(norm).netloc
        if net != base:
            continue
        if norm.endswith("#"):
            norm = norm[:-1]
        links.append(norm)
    return list(dict.fromkeys(links))

In [180]:
# ==== Consent page detection ====
def looks_like_consent_page(html: str) -> bool:
    s = html.lower()
    return ("consent" in s) or ("gdpr" in s) or ("privacy preferences" in s) or ("iab" in s)


In [181]:
# ==== Extract article meta (title, author, published_at, canonical) ====
def extract_article_meta(html: str, url: str):
    soup = BeautifulSoup(html, "lxml")
    title = None
    for sel in ["meta[property='og:title']", "meta[name='twitter:title']", "title"]:
        el = soup.select_one(sel)
        if el:
            title = el.get("content") if el.has_attr("content") else el.get_text(strip=True)
        if title:
            break
    author = None
    for sel in ["meta[name='author']", "meta[property='article:author']", "a[rel='author']"]:
        el = soup.select_one(sel)
        if el:
            author = el.get("content") if el.has_attr("content") else el.get_text(strip=True)
        if author:
            break
    published_at = None
    for sel in ["meta[property='article:published_time']", "meta[name='pubdate']", "time[datetime]"]:
        el = soup.select_one(sel)
        if el:
            published_at = el.get("content") if el.has_attr("content") else el.get("datetime")
        if published_at:
            break
    can = soup.find("link", rel=lambda x: x and "canonical" in x.lower())
    canonical = urlparse.urljoin(url, can["href"]) if can and can.get("href") else url
    canonical = normalize_url(canonical)
    return title, author, published_at, canonical


In [182]:
# ==== Crawl news site ====
def crawl_news_site(entry, cfg, max_article_per_listing=30, min_text_len=300):
    ua = cfg["user_agent"]
    start_urls = entry["start_urls"]
    selector_hint = entry.get("article_selector_hint", "a")
    records = []
    seen = set()
    for su in start_urls:
        resp = fetch(su, ua,
                     timeout=cfg["default_timeout_sec"],
                     max_retries=cfg["max_retries"],
                     min_delay_sec=cfg["per_host_min_delay_sec"])
        links = parse_listing_find_links(resp.text, su, selector_hint)
        for lk in links[:max_article_per_listing]:
            try:
                art = fetch(lk, ua,
                            timeout=cfg["default_timeout_sec"],
                            max_retries=cfg["max_retries"],
                            min_delay_sec=cfg["per_host_min_delay_sec"])
                if looks_like_consent_page(art.text):
                    print("skip consent page:", lk)
                    continue
                text = extract_article_text(art.text) or ""
                if len(text) < min_text_len:
                    continue
                title, author, published_at, canonical = extract_article_meta(art.text, lk)
                canonical = canonical or normalize_url(lk)
                if not canonical:
                    continue
                h = sha1(canonical.encode("utf-8")).hexdigest()
                if h in seen:
                    continue
                seen.add(h)
                records.append({
                    "source_type": "news",
                    "source_name": entry["name"],
                    "subreddit": None,
                    "url": lk,
                    "canonical_url": canonical,
                    "title": title,
                    "text": text,
                    "author": author,
                    "published_at": published_at,
                    "score": None,
                    "comments": None,
                    "fetched_at": datetime.utcnow().isoformat(timespec="seconds")
                })
            except Exception as e:
                print("error on", lk, e)
    return records


CSV Schema

In [183]:
# ==== CSV Schema and helpers ====
CSV_SCHEMA = [
    "source_type", "source_name", "subreddit", "url", "canonical_url",
    "title", "text", "author", "published_at", "score", "comments",
    "language", "token_count", "predicted_label", "label_scores", "fetched_at"
]

def to_dataframe(records):
    df = pd.DataFrame(records)
    for col in CSV_SCHEMA:
        if col not in df.columns:
            df[col] = None
    return df[CSV_SCHEMA]

def save_csv(df, path):
    df.to_csv(path, index=False, encoding="utf-8")
    print("saved", path)


Reading Samples

In [184]:
# ==== Run the crawlers ====
with open("configs/crawl_config.yaml", "r", encoding="utf-8") as f:
    cfg = yaml.safe_load(f)

all_records = []

# Reddit
for sub in cfg["sources"]["reddit"]["subreddits"]:
    try:
        recs = crawl_reddit_subreddit(sub, cfg)
        all_records.extend(recs)
        print(f"reddit {sub} records:", len(recs))
    except Exception as e:
        print("reddit error", sub, e)

# News
for site in cfg["sources"]["news_sites"]:
    try:
        recs = crawl_news_site(site, cfg)
        all_records.extend(recs)
        print(f"news {site['name']} records:", len(recs))
    except Exception as e:
        print("news error", site["name"], e)

df = to_dataframe(all_records)
ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
out_path = f"storage/raw/crawl_{ts}.csv"
save_csv(df, out_path)
df.head(10)

robots disallows HTML for https://old.reddit.com/r/worldnews/, switching to RSS
robots disallows RSS for https://www.reddit.com/r/worldnews/.rss
reddit worldnews records: 0
robots disallows HTML for https://old.reddit.com/r/news/, switching to RSS
robots disallows RSS for https://www.reddit.com/r/news/.rss
reddit news records: 0
robots disallows HTML for https://old.reddit.com/r/politics/, switching to RSS
robots disallows RSS for https://www.reddit.com/r/politics/.rss
reddit politics records: 0
robots disallows HTML for https://old.reddit.com/r/technology/, switching to RSS
robots disallows RSS for https://www.reddit.com/r/technology/.rss
reddit technology records: 0
robots disallows HTML for https://old.reddit.com/r/science/, switching to RSS
robots disallows RSS for https://www.reddit.com/r/science/.rss
reddit science records: 0
robots disallows HTML for https://old.reddit.com/r/environment/, switching to RSS
robots disallows RSS for https://www.reddit.com/r/environment/.rss
reddit 

Unnamed: 0,source_type,source_name,subreddit,url,canonical_url,title,text,author,published_at,score,comments,language,token_count,predicted_label,label_scores,fetched_at


NPR news HTML Crawl

In [185]:
def parse_npr_listing(html, base_url):
    soup = BeautifulSoup(html, "lxml")
    items = []
    for a in soup.select("a[href*='/story/'], a[href*='/sections/world/']"):
        href = a.get("href")
        if not href:
            continue
        url = urlparse.urljoin(base_url, href)
        url = normalize_url(url)
        if not url:
            continue
        title = a.get_text(" ", strip=True) or None
        items.append({
            "source_type": "news",
            "source_name": "npr_world",
            "subreddit": None,
            "url": url,
            "canonical_url": url,
            "title": title,
            "text": None,
            "author": None,
            "published_at": None,
            "score": None,
            "comments": None,
            "fetched_at": datetime.utcnow().isoformat(timespec="seconds")
        })
    return dedupe_records(items)


CSV schema & saving

Initial run and output production