# SETTING ENVIRONMENT


In [None]:
"""# mount the colab with google drive
from google.colab import drive
drive.mount('/content/drive')"""

In [None]:
# set folder tempat kerja (current working directory)
import os

cwd = "/Users/yusufpradana/Library/CloudStorage/OneDrive-Personal/Pekerjaan BMN/05. 2025/98_monitoring_berita/monitoring-berita"
# cwd = '/content/drive/MyDrive/Monitoring Berita'
os.chdir(cwd)

# MAIN CODE

## 1

In [None]:
# # Scraping Artikel Berita Multisumber (Fleksibel)
# - Membaca list URL dari Excel
# - Ekstraksi teks artikel bersih (tanpa iklan/link terkait)
# - Error handling kuat, retry, timeout
# - Site-specific extractor + fallback readability/trafilatura + heuristik generik
# - Output dataframe: url_berita, artikel_berita, source_domain, status, error

# %%
!pip -q install beautifulsoup4 lxml html5lib requests-html readability-lxml trafilatura tqdm pandas openpyxl regex > /dev/null

# --- Sel 1 (tambahkan paket) ---
!pip -q install cloudscraper httpx > /dev/null


# %%
import re
import regex as reg
import sys
import time
import json
import logging
import warnings
from dataclasses import dataclass
from typing import Optional, List, Tuple, Callable, Dict

import pandas as pd
import requests
from bs4 import BeautifulSoup, NavigableString, Tag
from urllib.parse import urlparse, urljoin
from tqdm.auto import tqdm

warnings.filterwarnings("ignore")

# Optional imports (fallback parsers)
try:
    from readability import Document
    HAS_READABILITY = True
except Exception:
    HAS_READABILITY = False

try:
    import trafilatura
    HAS_TRAFILATURA = True
except Exception:
    HAS_TRAFILATURA = False

# Logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
)
logger = logging.getLogger("news-scraper")


## 2

In [None]:
# %%
# ======== PARAMETER YANG MUDAH DIUBAH ========

# config path
from pathlib import Path

# CONFIG_PATH = Path("/content/drive/MyDrive/Monitoring Berita/config.json")

CONFIG_PATH = Path(f"{cwd}/config.json")

with open(CONFIG_PATH, "r", encoding="utf-8") as f:
            cfg = json.load(f)

# INPUT_EXCEL_PATH = cfg["labelled_data_xlsx"]
INPUT_CSV_PATH = cfg["last_output_path"]
INPUT_URL_COLUMN = "url_berita"

# Batasi jumlah URL saat uji (None untuk semua)
MAX_URLS: Optional[int] = None

# Timeout & retry
REQUEST_TIMEOUT = 20
MAX_RETRIES = 2
RETRY_SLEEP = 1.5

# User-Agent khusus agar tidak diblok mudah
DEFAULT_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
    )
}

# Pola frasa yang ingin dihapus dari artikel (iklan/clickbait/relate links)
CLEANUP_PATTERNS = [
    r"\bBaca juga\b.*", r"\bSimak juga\b.*", r"\bADVERTISEMENT\b.*",
    r"\bIklan\b.*", r"\bIklan Layanan\b.*", r"\bInfografis\b.*",
    r"\bTonton juga\b.*", r"\bVideo:\b.*", r"\b[Gg]rafis\b.*",
    r"\b[Gg]allery\b.*", r"\bArtikel ini telah\b.*", r"\bEditor:\b.*$",
    r"^\s*—\s*$", r"^\s*-\s*$", r"\b[ \t]*[•\-\*]\s*$"
]

# Tag/kelas yang menandai elemen non-konten yang harus dibuang
JUNK_SELECTORS = [
    "[class*=iklan]", "[class*=ads]", "[id*=ads]", "[id*=banner]", "[class*=related]",
    "[class*=tag]", "[class*=breadcrumb]", "script", "style", "noscript", "iframe",
    "[class*=share]", "[class*=social]", "[class*=recommend]", "[class*=promo]",
    "[class*=copyright]", "[class*=author]", "[class*=metadata]"
]


## 3

In [None]:
# %%
session = requests.Session()
session.headers.update(DEFAULT_HEADERS)

def fetch(url: str, timeout: int = REQUEST_TIMEOUT, max_retries: int = MAX_RETRIES) -> Optional[requests.Response]:
    """
    Fetch URL dengan retry sederhana dan timeout.
    """
    last_err = None
    for attempt in range(1, max_retries + 1):
        try:
            resp = session.get(url, timeout=timeout, allow_redirects=True)
            if 200 <= resp.status_code < 300:
                return resp
            else:
                last_err = RuntimeError(f"HTTP {resp.status_code}")
                logger.warning(f"Attempt {attempt}: {url} -> {resp.status_code}")
        except Exception as e:
            last_err = e
            logger.warning(f"Attempt {attempt} failed for {url}: {e}")
        time.sleep(RETRY_SLEEP * attempt)
    logger.error(f"Failed to fetch {url}: {last_err}")
    return None

def normalize_whitespace(text: str) -> str:
    text = reg.sub(r"[ \t]+", " ", text)
    text = reg.sub(r"\n{3,}", "\n\n", text)
    return text.strip()

def remove_cleanup_patterns(text: str, patterns: List[str]) -> str:
    out = text
    for pat in patterns:
        out = re.sub(pat, "", out, flags=re.IGNORECASE)
    # Buang baris kosong sisa
    out = "\n".join([ln.strip() for ln in out.splitlines() if ln.strip()])
    return normalize_whitespace(out)


In [None]:
# --- Sel 3 (ganti sesi & fetch) ---
import random
import httpx
import cloudscraper
from urllib.parse import urlparse

# Headers "benar-bener browser"
BROWSER_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
    ),
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
    "Accept-Language": "id-ID,id;q=0.9,en-US;q=0.8,en;q=0.7",
    "Cache-Control": "no-cache",
    "Pragma": "no-cache",
    "Connection": "keep-alive",
    "Upgrade-Insecure-Requests": "1",
}

def make_session():
    # cloudscraper biasanya lolos 403 di situs yang pakai anti-bot
    try:
        s = cloudscraper.create_scraper(
            browser={"browser": "chrome", "platform": "windows", "mobile": False}
        )
        s.headers.update(BROWSER_HEADERS)
        return s
    except Exception:
        s = requests.Session()
        s.headers.update(BROWSER_HEADERS)
        return s

session = make_session()

def jitter_sleep(base=REQUEST_TIMEOUT * 0.05):
    time.sleep(base + random.random() * 0.8)

def as_amp(url: str) -> str:
    # Coba bentuk /amp di Tribunnews (tanpa query).
    # https://www.tribunnews.com/... -> https://www.tribunnews.com/.../amp
    u = url.split("?")[0].rstrip("/")
    return u + "/amp"

def fetch(url: str, timeout: int = REQUEST_TIMEOUT, max_retries: int = MAX_RETRIES) -> Optional[requests.Response]:
    """
    Fetch dengan retry + header lengkap; khusus Tribunnews:
      - set Referer: Google
      - jika 403: coba versi AMP
    """
    last_err = None
    parsed = urlparse(url)
    is_tribun = "tribunnews.com" in (parsed.netloc or "")

    for attempt in range(1, max_retries + 1):
        try:
            headers = BROWSER_HEADERS.copy()
            # Referer membantu untuk situs yang protektif
            headers["Referer"] = "https://www.google.com/"
            resp = session.get(url, headers=headers, timeout=timeout, allow_redirects=True)
            if 200 <= resp.status_code < 300:
                return resp

            # Kalau 403 dan Tribun, langsung coba AMP di attempt berikutnya
            if is_tribun and resp.status_code in (401, 403):
                logger.warning(f"Attempt {attempt}: {url} -> {resp.status_code}. Trying AMP...")
                amp_url = as_amp(url)
                try:
                    resp2 = session.get(amp_url, headers=headers, timeout=timeout, allow_redirects=True)
                    if 200 <= resp2.status_code < 300:
                        # inject URL supaya downstream tahu url asli (opsional)
                        resp2.url = url
                        return resp2
                    else:
                        last_err = RuntimeError(f"AMP HTTP {resp2.status_code}")
                except Exception as e2:
                    last_err = e2
            else:
                last_err = RuntimeError(f"HTTP {resp.status_code}")
        except Exception as e:
            last_err = e
            logger.warning(f"Attempt {attempt} failed for {url}: {e}")

        # sedikit jeda supaya tidak dicap bot
        jitter_sleep(0.6)
    logger.error(f"Failed to fetch {url}: {last_err}")
    return None


## 4

In [None]:
# %%
CANDIDATE_CONTENT_SELECTORS = [
    "article",
    "[class*=content]",
    "[class*=article]",
    "[class*=read__content]",
    "[class*=detail__body]",
    "[class*=entry-content]",
    "[id*=content]", "[id*=article]",
    "[itemprop='articleBody']",
]

def clean_soup(soup: BeautifulSoup) -> None:
    # Hapus elemen junk
    for sel in JUNK_SELECTORS:
        for el in soup.select(sel):
            el.decompose()

def join_block_text(container: Tag) -> str:
    # Ambil <p>, <h2/3/4>, <li> sebagai paragraf/kalimat
    parts: List[str] = []
    for el in container.descendants:
        if isinstance(el, Tag):
            if el.name in {"p", "h2", "h3", "h4", "li"}:
                txt = el.get_text(separator=" ", strip=True)
                if txt and len(txt) > 2:
                    parts.append(txt)
        elif isinstance(el, NavigableString):
            # Abaikan NavigableString langsung agar tidak duplikat
            pass
    return "\n".join(parts)

def generic_extract(html: str) -> str:
    soup = BeautifulSoup(html, "lxml")
    clean_soup(soup)

    # 1) cari tag <article> atau kandidat body konten
    for sel in CANDIDATE_CONTENT_SELECTORS:
        nodes = soup.select(sel)
        for node in nodes:
            text = join_block_text(node)
            if text and len(text) > 300:  # ambang minimal isi artikel
                return text

    # 2) fallback: ambil konten terpanjang dari beberapa kandidat
    candidates = []
    for node in soup.find_all(["article", "div", "section"], limit=50):
        text = join_block_text(node)
        if text:
            candidates.append((len(text), text))
    if candidates:
        candidates.sort(reverse=True, key=lambda x: x[0])
        return candidates[0][1]

    # 3) fallback terakhir: keseluruhan dokumen (p, h2, li)
    body = soup.body or soup
    text = join_block_text(body)
    return text


## 5

In [None]:
# %%
def extract_detik(soup: BeautifulSoup) -> Optional[str]:
    clean_soup(soup)
    # Detik sering pakai class berikut:
    for sel in [
        "article.detail__article",
        "div.detail__body-text",
        "[class*=detail__body]",
        "[class*=itp_bodycontent]"
    ]:
        nodes = soup.select(sel)
        for n in nodes:
            txt = join_block_text(n)
            if txt and len(txt) > 200:
                return txt
    return None

def extract_cnbcindo(soup: BeautifulSoup) -> Optional[str]:
    clean_soup(soup)
    for sel in [
        "article",
        "div.detail_text",
        "[class*=detail__body]"
    ]:
        nodes = soup.select(sel)
        for n in nodes:
            txt = join_block_text(n)
            if txt and len(txt) > 200:
                return txt
    return None

def extract_kompas(soup: BeautifulSoup) -> Optional[str]:
    clean_soup(soup)
    for sel in [
        "div.read__content",
        "article",
        "[class*=read__content]",
    ]:
        nodes = soup.select(sel)
        for n in nodes:
            txt = join_block_text(n)
            if txt and len(txt) > 200:
                return txt
    return None

def extract_liputan6(soup: BeautifulSoup) -> Optional[str]:
    clean_soup(soup)
    for sel in [
        "div.article-content-body__item-content",  # paragraf-paragraf
        "div.article-content-body",
        "article"
    ]:
        nodes = soup.select(sel)
        if nodes:
            # gabungkan semua paragraf dari beberapa item-content
            text_parts = []
            for n in nodes:
                t = join_block_text(n)
                if t:
                    text_parts.append(t)
            txt = "\n".join(text_parts)
            if txt and len(txt) > 200:
                return txt
    return None

def extract_tribun(soup: BeautifulSoup) -> Optional[str]:
    clean_soup(soup)
    for sel in [
        "div.side-article txt-article",
        "div.txt-article",
        "div#articlebody",
        "div#article"
    ]:
        nodes = soup.select(sel)
        for n in nodes:
            txt = join_block_text(n)
            if txt and len(txt) > 200:
                return txt
    # Lainnya:
    nodes = soup.select("div > p")
    if nodes:
        txt = "\n".join([n.get_text(" ", strip=True) for n in nodes])
        if len(txt) > 200:
            return txt
    return None

def extract_merdeka(soup: BeautifulSoup) -> Optional[str]:
    clean_soup(soup)
    for sel in [
        "div.article-content",
        "div.kanal-content",
        "article"
    ]:
        nodes = soup.select(sel)
        for n in nodes:
            txt = join_block_text(n)
            if txt and len(txt) > 200:
                return txt
    return None

def extract_antaranews(soup: BeautifulSoup) -> Optional[str]:
    clean_soup(soup)
    for sel in [
        "div.post-content",
        "div#content",
        "article"
    ]:
        nodes = soup.select(sel)
        for n in nodes:
            txt = join_block_text(n)
            if txt and len(txt) > 200:
                return txt
    return None

DOMAIN_EXTRACTORS: Dict[str, Callable[[BeautifulSoup], Optional[str]]] = {
    "detik.com": extract_detik,
    "cnbcindonesia.com": extract_cnbcindo,
    "kompas.com": extract_kompas,
    "liputan6.com": extract_liputan6,
    "tribunnews.com": extract_tribun,
    "merdeka.com": extract_merdeka,
    "antaranews.com": extract_antaranews,
}


In [None]:
def extract_tribun(soup: BeautifulSoup) -> Optional[str]:
    clean_soup(soup)
    # Versi desktop lama/baru
    for sel in [
        "div.txt-article", "div#article",
        "div#articlebody", "div.side-article.txt-article",
        "article", "[class*=read__content]", "[class*=detail__body]"
    ]:
        for n in soup.select(sel):
            txt = join_block_text(n)
            if txt and len(txt) > 200:
                return txt
    # fallback p
    ps = soup.select("article p, div p")
    if ps:
        txt = "\n".join([p.get_text(" ", strip=True) for p in ps])
        if len(txt) > 200:
            return txt
    return None

def extract_tribun_amp(soup: BeautifulSoup) -> Optional[str]:
    clean_soup(soup)
    # AMP biasanya lebih bersih
    for sel in [
        "div.read__content", "div.read__content--body",
        "article", "[itemprop='articleBody']"
    ]:
        for n in soup.select(sel):
            txt = join_block_text(n)
            if txt and len(txt) > 200:
                return txt
    ps = soup.select("article p, div p")
    if ps:
        txt = "\n".join([p.get_text(' ', strip=True) for p in ps])
        if len(txt) > 200:
            return txt
    return None


In [None]:
DOMAIN_EXTRACTORS.update({
    "tribunnews.com": extract_tribun,  # desktop
})


## 6

In [None]:
# %%
@dataclass
class ExtractResult:
    url: str
    domain: str
    text: Optional[str]
    status: str
    error: Optional[str]

def extract_with_layers(url: str) -> ExtractResult:
    """
    Layered extraction:
      1) site-specific extractor (jika domain dikenali)
      2) trafilatura (jika tersedia)
      3) readability-lxml (jika tersedia)
      4) generic heuristic
    """
    parsed = urlparse(url)
    domain = parsed.netloc.lower()
    domain_root = ".".join(domain.split(".")[-3:]) if domain.count(".") >= 2 else domain

    # Fetch HTML
    resp = fetch(url)
    if resp is None:
        return ExtractResult(url, domain, None, "fetch_failed", "Request failed")

    html = resp.text

    # 1) Site-specific (desktop)
    try:
        for known in DOMAIN_EXTRACTORS:
            if known in domain:
                soup = BeautifulSoup(html, "lxml")
                text = DOMAIN_EXTRACTORS[known](soup)
                if text and len(text) > 150:
                    text = remove_cleanup_patterns(text, CLEANUP_PATTERNS)
                    return ExtractResult(url, domain, text, "ok_site_specific", None)
    except Exception as e:
        logger.warning(f"Site-specific extractor error for {url}: {e}")

    # 1b) Jika Tribun, coba ekstraktor AMP bila konten belum didapat
    if "tribunnews.com" in domain:
        try:
            # ketika fetch() berhasil via AMP, resp.text yang kita punya bisa AMP atau desktop;
            # kalau masih desktop & kosong, paksa unduh AMP untuk parsing
            amp_html = None
            if "/amp" in resp.url or "amp" in (resp.headers.get("content-location", "") or "").lower():
                amp_html = html
            else:
                amp_resp = fetch(as_amp(url))
                if amp_resp is not None and 200 <= amp_resp.status_code < 300:
                    amp_html = amp_resp.text
            if amp_html:
                soup_amp = BeautifulSoup(amp_html, "lxml")
                text_amp = extract_tribun_amp(soup_amp)
                if text_amp and len(text_amp) > 150:
                    text_amp = remove_cleanup_patterns(text_amp, CLEANUP_PATTERNS)
                    return ExtractResult(url, domain, text_amp, "ok_tribun_amp", None)
        except Exception as e:
            logger.warning(f"Tribun AMP extractor error for {url}: {e}")


    # 1) Site-specific
    try:
        for known in DOMAIN_EXTRACTORS:
            if known in domain:
                soup = BeautifulSoup(html, "lxml")
                text = DOMAIN_EXTRACTORS[known](soup)
                if text and len(text) > 150:
                    text = remove_cleanup_patterns(text, CLEANUP_PATTERNS)
                    return ExtractResult(url, domain, text, "ok_site_specific", None)
    except Exception as e:
        logger.warning(f"Site-specific extractor error for {url}: {e}")

    # 2) Trafilatura  — perbaiki kompatibilitas argumen timeout
    if HAS_TRAFILATURA:
        try:
            downloaded = None
            try:
                import inspect
                if "timeout" in inspect.signature(trafilatura.fetch_url).parameters:
                    # versi baru: dukung timeout
                    downloaded = trafilatura.fetch_url(url, timeout=REQUEST_TIMEOUT)
                else:
                    # versi lama: tidak ada timeout
                    downloaded = trafilatura.fetch_url(url)
            except Exception:
                # fallback paling aman
                downloaded = trafilatura.fetch_url(url)

            if downloaded:
                t_text = trafilatura.extract(
                    downloaded,
                    include_comments=False,
                    include_tables=False,
                    favor_recall=True,   # sedikit lebih longgar ambil teks
                )
                if t_text and len(t_text) > 150:
                    t_text = remove_cleanup_patterns(t_text, CLEANUP_PATTERNS)
                    return ExtractResult(url, domain, t_text, "ok_trafilatura", None)
        except Exception as e:
            logger.warning(f"Trafilatura extractor error for {url}: {e}")


    # 3) Readability
    if HAS_READABILITY:
        try:
            doc = Document(html)
            readable_html = doc.summary(html_partial=True)
            soup = BeautifulSoup(readable_html, "lxml")
            clean_soup(soup)
            r_text = join_block_text(soup)
            if r_text and len(r_text) > 150:
                r_text = remove_cleanup_patterns(r_text, CLEANUP_PATTERNS)
                return ExtractResult(url, domain, r_text, "ok_readability", None)
        except Exception as e:
            logger.warning(f"Readability extractor error for {url}: {e}")

    # 4) Generic heuristic
    try:
        g_text = generic_extract(html)
        g_text = remove_cleanup_patterns(g_text, CLEANUP_PATTERNS)
        if g_text and len(g_text) > 100:
            return ExtractResult(url, domain, g_text, "ok_generic", None)
        else:
            return ExtractResult(url, domain, None, "empty_after_generic", None)
    except Exception as e:
        return ExtractResult(url, domain, None, "generic_failed", str(e))


## 7

In [None]:
# %%
# === Baca daftar URL dari Excel===
try:
    df_in = pd.read_csv(INPUT_CSV_PATH) # 🚀 limit ke 100 baris
except Exception as e:
    raise RuntimeError(f"Gagal membaca Excel input: {INPUT_CSV_PATH} -> {e}")

if INPUT_URL_COLUMN not in df_in.columns:
    raise ValueError(f"Kolom '{INPUT_URL_COLUMN}' tidak ditemukan di file Excel.")

urls = df_in[INPUT_URL_COLUMN].dropna().astype(str).str.strip().unique().tolist()

logger.info(f"Total URL untuk diproses (dibatasi 100): {len(urls)}")

results: List[ExtractResult] = []
for url in tqdm(urls, desc="Scraping artikel"):
    try:
        res = extract_with_layers(url)
        results.append(res)
    except Exception as e:
        parsed = urlparse(url)
        results.append(ExtractResult(url, parsed.netloc.lower(), None, "fatal_error", str(e)))

# === Buat dataframe hasil scraping ===
df_scraped = pd.DataFrame([{
    "url_berita": r.url,
    "source_domain": r.domain,
    "artikel_berita": r.text if r.text else "",
    "status": r.status,
    "error": r.error if r.error else "",
} for r in results])

# === Gabungkan dengan df_in, tambahkan kolom baru ===
df_output = df_in.merge(df_scraped, on="url_berita", how="left")

# === Bersihkan artikel_berita ===
def final_cleanup(text: str) -> str:
    if not isinstance(text, str):
        return ""
    text = normalize_whitespace(text)
    lines = [ln for ln in text.splitlines() if len(ln.strip()) > 2]
    return "\n".join(lines).strip()

df_output["artikel_berita"] = df_output["artikel_berita"].apply(final_cleanup)
df_output["is_empty"] = df_output["artikel_berita"].str.len().fillna(0).lt(60)

# === Ringkas hasil ===
summary = df_output["status"].value_counts(dropna=False)
logger.info(f"\nSummary status:\n{summary}")

df_output.head(5)

## 8

In [None]:
# %%
# Simpan sebagai CSV/Excel untuk integrasi lanjutan
DATE_TAG = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")

OUTPUT_CSV = f"{cwd}/hasil_baca_berita/hasil_scraping_artikel_{DATE_TAG}.csv"
# OUTPUT_XLSX = f"{cwd}/hasil_baca_berita/hasil_scraping_artikel_{DATE_TAG}.xlsx"

try:
    df_output.to_csv(OUTPUT_CSV, index=False, encoding="utf-8")
    # df_output.to_excel(OUTPUT_XLSX, index=False)
    logger.info(f"Tersimpan: {OUTPUT_CSV}")
except Exception as e:
    logger.error(f"Gagal menyimpan output: {e}")

df_output.tail(3)


In [None]:
# %%
import json
from pathlib import Path


def update_config(path: Path, new_values: dict):
    """Update config.json hanya pada key tertentu tanpa menimpa keseluruhan isi."""
    data = {}
    if path.exists():
        try:
            with open(path, "r", encoding="utf-8") as f:
                data = json.load(f)
        except Exception as e:
            logger.warning(f"Gagal membaca config lama: {e}")
            data = {}

    # update hanya key yang diberikan
    data.update(new_values)

    try:
        with open(path, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=4, ensure_ascii=False)
        logger.info(f"Berhasil update config.json di {path}")
    except Exception as e:
        logger.error(f"Gagal menyimpan config.json: {e}")

# Simpan OUTPUT_CSV & OUTPUT_XLSX ke config dengan nama yang lebih jelas
update_config(CONFIG_PATH, {
    "last_output_path": OUTPUT_CSV,
})
