### KT OTT 구독 상품 크롤러 (KT OTT Subscription Crawler)

KT 구독 상품 페이지에서 다양한 OTT 구독 상품(티빙, 디즈니+, 유튜브 프리미엄 등)의 정보를 수집합니다. 상품별로 상이한 HTML 구조를 자동으로 감지하여 표준화된 JSON 포맷으로 데이터를 추출합니다.

**주요 기능:**
- 상품 기본 정보, 제휴 서비스(OTT, 카페 등) 자동 식별
- 복잡한 테이블(rowspan, colspan) 구조 및 레이아웃(Row/Column-based) 자동 대응
- 요금제별 가격(정상가, KT구독가, 프로모션가) 및 혜택 상세 파싱
- JSON 파일 저장

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import re
import json
import os
from datetime import datetime
import requests
from bs4 import BeautifulSoup

def clean_text(text):
    if not text:
        return ""
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def parse_price_str(text):
    if not text:
        return 0
    cleaned = re.sub(r'[^\d]', '', text)
    return int(cleaned) if cleaned else 0

def generate_plan_id_and_key(product_num, plan_title, main_service_name=None):
    # 1) Main Service 감지
    service_code = "UNKNOWN"
    title_upper = plan_title.upper()

    if "티빙" in plan_title or "TVING" in title_upper: service_code = "TVING"
    elif "디즈니" in plan_title or "DISNEY" in title_upper: service_code = "DISNEY"
    elif "유튜브" in plan_title or "YOUTUBE" in title_upper: service_code = "YOUTUBE"

    if service_code == "UNKNOWN" and main_service_name:
        service_code = main_service_name

    # 2) Tier (등급)
    tier_map = {
        "광고형스탠다드": "AD_STANDARD", "광고형 스탠다드": "AD_STANDARD", "광고형": "AD_STANDARD",
        "프리미엄": "PREMIUM", "스탠다드": "STANDARD", "베이직": "BASIC",
        "초이스": "CHOICE", "라이트": "LITE"
    }
    tier_code = "UNKNOWN"
    base_suffix = "unknown"

    for kr_key in sorted(tier_map.keys(), key=len, reverse=True):
        if kr_key in plan_title:
            tier_code = tier_map[kr_key]
            base_suffix = tier_code.lower()
            break

    # 3) Partner (제휴사)
    partner_code = None
    if "메가" in plan_title or "MGC" in title_upper: partner_code = "MGC"
    elif "스타벅스" in plan_title: partner_code = "STARBUCKS"
    elif "생활구독" in plan_title: partner_code = "LIFE"

    parts = [f"KT_{product_num}", service_code, tier_code]
    if partner_code: parts.append(partner_code)

    base_plan_key = f"{service_code.lower()}_{base_suffix}"

    return "_".join(parts), base_plan_key

In [None]:
# 구독 상품 정보 추출
def flatten_table_headers(table):
    thead = table.select_one("thead")
    if not thead: return []

    rows = thead.select("tr")
    grid_w = 30
    grid_h = len(rows)
    grid = [["" for _ in range(grid_w)] for _ in range(grid_h)]

    for r, tr in enumerate(rows):
        c = 0
        for th in tr.select("th"):
            while c < grid_w and grid[r][c] != "": c += 1

            txt = clean_text(th.get_text())
            colspan = int(th.get('colspan', 1))
            rowspan = int(th.get('rowspan', 1))

            for i in range(rowspan):
                for j in range(colspan):
                    if r+i < grid_h and c+j < grid_w:
                        grid[r+i][c+j] = txt
            c += colspan

    valid_cols = 0
    for col in grid[0]:
        if col != "": valid_cols += 1
        else: break

    flat_headers = []
    for c in range(valid_cols):
        parts = []
        for r in range(grid_h):
            val = grid[r][c]
            if val and (not parts or parts[-1] != val): parts.append(val)

        full_name = " > ".join(parts)
        full_name = re.sub(r'\s*\(.*?\)', '', full_name)
        flat_headers.append(full_name.strip())

    return flat_headers

def extract_prices_from_cell(cell, cell_text):
    regular, kt, promo = 0, 0, 0

    if cell.select_one("[aria-label*='정상가']"): regular = parse_price_str(cell.select_one("[aria-label*='정상가']")['aria-label'])
    if cell.select_one("[aria-label*='프로모션가']"): promo = parse_price_str(cell.select_one("[aria-label*='프로모션가']")['aria-label'])
    if cell.select_one("[aria-label*='할인가']"): promo = parse_price_str(cell.select_one("[aria-label*='할인가']")['aria-label'])

    if kt == 0:
        m = re.search(r"KT\s*구독\s*(판매)?가\s*([\d,]+)", cell_text)
        if m: kt = parse_price_str(m.group(2))
    if regular == 0:
        m = re.search(r"(정상가)?\s*^([\d,]+)원", cell_text)
        if m: regular = parse_price_str(m.group(2) if len(m.groups())>1 else m.group(1))
    if promo == 0:
        m = re.search(r"(프로모션가|할인가)\s*([\d,]+)", cell_text)
        if m: promo = parse_price_str(m.group(2))

    if kt == 0 and regular > 0: kt = regular

    return {"regular": regular, "kt": kt, "promo": promo}

def extract_plans_universal(soup, product_num, main_service_name=None):
    table = soup.select_one(".pduct-tbl-plan")
    if not table: return []

    thead = table.select_one("thead")
    tbody = table.select_one("tbody")
    if not thead or not tbody: return []

    header_row_text = clean_text(thead.get_text())
    tier_keywords = ["광고형", "베이직", "스탠다드", "프리미엄", "초이스"]

    is_column_based = any(k in header_row_text for k in tier_keywords) and "월정액" not in header_row_text

    plans = []

    if is_column_based:
        header_cells = thead.select("tr:last-child th")

        for idx, th in enumerate(header_cells):
            txt = clean_text(th.get_text())
            if "구분" in txt or not txt: continue

            pid, bkey = generate_plan_id_and_key(product_num, txt, main_service_name)

            plans.append({
                "plan_id": pid, "plan_title": txt, "base_plan_key": bkey,
                "prices": {}, "benefits": []
            })

        current_category = ""
        for tr in tbody.select("tr"):
            ths = tr.select("th")
            tds = tr.select("td")

            th_texts = [clean_text(th.get_text()) for th in ths]
            attr_name = ""

            if len(ths) == 2:
                current_category = th_texts[0]
                attr_name = f"{current_category} > {th_texts[1]}"
            elif len(ths) == 1:
                if ths[0].get("colspan") == "2":
                    attr_name = th_texts[0]
                else:
                    if current_category: attr_name = f"{current_category} > {th_texts[0]}"
                    else: attr_name = th_texts[0]
                    if ths[0].has_attr("rowspan"): current_category = th_texts[0]

            attr_name = re.sub(r'\s*\(.*?\)', '', attr_name).strip()

            if len(tds) == len(plans):
                is_price_row = "월정액" in attr_name
                for i, td in enumerate(tds):
                    val = clean_text(td.get_text())
                    if is_price_row:
                        p_data = extract_prices_from_cell(td, val)
                        plans[i]["prices"] = {**p_data, "currency": "KRW", "billing_period": "monthly"}
                    else:
                        plans[i]["benefits"].append({"name": attr_name, "detail": val})

    else:
        headers = flatten_table_headers(table)
        rows = tbody.select("tr")
        rowspan_map = {}

        for tr in rows:
            tds = list(tr.select("th, td"))
            current_row_values = []

            td_ptr = 0
            for col_idx in range(len(headers)):
                if col_idx in rowspan_map and rowspan_map[col_idx]['count'] > 0:
                    val = rowspan_map[col_idx]['text']
                    rowspan_map[col_idx]['count'] -= 1
                    current_row_values.append(val)
                else:
                    if td_ptr < len(tds):
                        cell = tds[td_ptr]
                        if cell.has_attr("rowspan"):
                            rs = int(cell['rowspan'])
                            if rs > 1: rowspan_map[col_idx] = {'count': rs - 1, 'text': cell}
                        current_row_values.append(cell)
                        td_ptr += 1
                    else:
                        current_row_values.append(None)

            if not current_row_values[0]: continue
            plan_name = clean_text(current_row_values[0].get_text())

            price_idx = -1
            for idx, h in enumerate(headers):
                if "월정액" in h:
                    price_idx = idx
                    break
            if price_idx == -1: price_idx = 1

            price_cell = current_row_values[price_idx]
            price_data = extract_prices_from_cell(price_cell, clean_text(price_cell.get_text()))

            benefits = []
            for i in range(len(headers)):
                if i == 0 or i == price_idx: continue
                cell = current_row_values[i]
                if not cell: continue

                if cell.select("li"): detail_txt = "\n".join([clean_text(li.get_text()) for li in cell.select("li")])
                else: detail_txt = clean_text(cell.get_text())

                if detail_txt: benefits.append({"name": headers[i], "detail": detail_txt})

            pid, bkey = generate_plan_id_and_key(product_num, plan_name, main_service_name)

            plans.append({
                "plan_id": pid, "plan_title": plan_name, "base_plan_key": bkey,
                "prices": {**price_data, "currency": "KRW", "billing_period": "monthly"},
                "benefits": benefits
            })

    return plans

In [None]:
# 메타데이터 추출
def extract_services_from_page(soup, title_text):
    services = []
    text_blob = (title_text + " " + soup.get_text()).upper()
    mapping = [
        ("TVING", "티빙", "OTT_TVING", "OTT"),
        ("DISNEY", "디즈니", "OTT_DISNEY", "OTT"),
        ("YOUTUBE", "유튜브", "OTT_YOUTUBE", "OTT"),
        ("MGC", "메가", "COFFEE_MGC", "COFFEE"),
        ("STARBUCKS", "스타벅스", "COFFEE_STARBUCKS", "COFFEE"),
        ("LIFE", "생활구독", "LIFE_SUBSCRIBE", "LIFE")
    ]
    added = set()
    for en, kr, sid, cat in mapping:
        if en in text_blob or kr in text_blob:
            if sid not in added:
                services.append({"service_id": sid, "name": en if cat=="OTT" else kr+" "+en, "category": cat})
                added.add(sid)
    return services

def parse_promotion(soup):
    txt = soup.get_text()
    promo = {"status": "ACTIVE"}
    m = re.search(r"(\d{2}[\.년]\s*\d{1,2}[\.월]\s*\d{1,2})[일]?\s*~\s*['’]?(\d{2}[\.년]\s*\d{1,2}[\.월]\s*\d{1,2})", txt)
    if m: promo["join_period_text"] = m.group(0)
    return promo

def extract_policy(soup):
    policies = []
    for t in soup.select(".pduct-noted-list li, .accordion-contents li, .summary-info span"):
        c = clean_text(t.get_text())
        if c and c not in policies: policies.append(c)
    return " ".join(policies)

In [None]:
# 메인함수 실행
def build_raw_json(url: str):
    try:
        html = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}).text
        soup = BeautifulSoup(html, "html.parser")

        m = re.search(r"/prodetail/(\d+)/", url)
        pid = m.group(1) if m else "0000"
        title = soup.title.string if soup.title else "KT Product"

        services = extract_services_from_page(soup, title)

        main_service_name = None
        for svc in services:
            if svc['category'] == 'OTT':
                main_service_name = svc['service_id'].replace("OTT_", "")
                break

        plans = extract_plans_universal(soup, pid, main_service_name)

        return {
            "provider": {"name": "KT", "provider_id": "KT"},
            "product": {
                "product_num": pid,
                "product_id": f"KT_{pid}",
                "name": title,
                "url": url,
                "fetched_at": datetime.now().isoformat()
            },
            "services": services,
            "subscribe_options": plans,
            "promotion": parse_promotion(soup),
            "policy": extract_policy(soup)
        }
    except Exception as e:
        print(f"Error {url}: {e}")
        return None

In [None]:
urls = [
    "https://m.product.kt.com/static/prodetail/1665/mobile/detail_view/m_ott_pop_tving_mgc_coffee.html",
    "https://m.product.kt.com/static/prodetail/1610/mobile/detail_view/m_ott_pop_disney_starbucks.html",
    "https://m.product.kt.com/static/prodetail/1667/mobile/detail_view/m_ott_pop_disney_life_subscribe.html",
    "https://m.product.kt.com/static/prodetail/1599/mobile/detail_view/m_ott_pop_youtube_starbucks.html",
    "https://m.product.kt.com/static/prodetail/1579/mobile/detail_view/m_ott_pop_tving_starbucks.html",
    "https://m.product.kt.com/static/prodetail/1668/mobile/detail_view/m_ott_pop_disney_mgc_coffee.html",
    "https://m.product.kt.com/static/prodetail/1580/mobile/detail_view/m_ott_pop_tving.html"
]
out_dir = "/content/drive/MyDrive/project_Demo/start_final/crawling_result"
os.makedirs(out_dir, exist_ok=True)

for url in urls:
    data = build_raw_json(url)
    if data:
        pid = data["product"]["product_num"]
        file_path = f"{out_dir}/raw_{pid}.json"

        with open(file_path, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        print(f"Saved to Drive: {file_path}")

Saved to Drive: /content/drive/MyDrive/project_Demo/start_final/crawling_result/raw_1665.json
Saved to Drive: /content/drive/MyDrive/project_Demo/start_final/crawling_result/raw_1610.json
Saved to Drive: /content/drive/MyDrive/project_Demo/start_final/crawling_result/raw_1667.json
Saved to Drive: /content/drive/MyDrive/project_Demo/start_final/crawling_result/raw_1599.json
Saved to Drive: /content/drive/MyDrive/project_Demo/start_final/crawling_result/raw_1579.json
Saved to Drive: /content/drive/MyDrive/project_Demo/start_final/crawling_result/raw_1668.json
Saved to Drive: /content/drive/MyDrive/project_Demo/start_final/crawling_result/raw_1580.json
