## 최종 Subpage Crawling

- 재귀호출을 통한 subpage의 subpage 통합

In [27]:
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from bs4 import BeautifulSoup
from urllib.parse import urljoin

def create_session(retries=5, backoff_factor=1):
    """
    requests.Session을 생성하고, 재시도 및 백오프 정책을 설정합니다.
    """
    session = requests.Session()
    retry = Retry(
        total=retries,  # 최대 재시도 횟수
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,  # 재시도 간 대기 시간 증가율
        status_forcelist=[500, 502, 503, 504],  # 재시도할 HTTP 상태 코드
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session

def crawl_subpages(base_url, session, visited=None, max_depth=10, current_depth=0):
    """
    주어진 URL에서 내부 링크를 재귀적으로 크롤링하여 모든 하위 사이트를 찾습니다.
    
    Parameters:
        - base_url: 크롤링 시작 URL
        - session: requests.Session 객체
        - visited: 이미 방문한 URL 집합
        - max_depth: 최대 크롤링 깊이
        - current_depth: 현재 크롤링 깊이
    """
    if visited is None:
        visited = set()

    if current_depth > max_depth:
        return visited

    try:
        response = session.get(base_url, timeout=5)  # timeout 설정
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, "html.parser")
            for link in soup.find_all("a", href=True):
                full_url = urljoin(base_url, link["href"])
                # 같은 도메인 내부 링크만 수집
                if base_url in full_url and full_url not in visited:
                    visited.add(full_url)
                    # 재귀적으로 하위 링크 탐색
                    crawl_subpages(full_url, session, visited, max_depth, current_depth + 1)
    except requests.exceptions.RequestException as e:
        print(f"Request error for URL {base_url}: {e}")
    
    return visited

# 메인 URL
base_url = "https://www.kead.or.kr/"

# 세션 생성
session = create_session()

# 하위 페이지 크롤링
subpages = crawl_subpages(base_url, session)

# 결과 출력
if subpages:
    print("Found Subpages:")
    print("\n".join(sorted(subpages)))
else:
    print("No subpages found.")


Found Subpages:
https://www.kead.or.kr/
https://www.kead.or.kr/#main_content
https://www.kead.or.kr/artcincr/cntntsPage.do?menuId=MENU0829
https://www.kead.or.kr/artcincr/cntntsPage.do?menuId=MENU0829#main_content
https://www.kead.or.kr/asftmnplcy/cntntsPage.do?menuId=MENU0855
https://www.kead.or.kr/asftmnplcy/cntntsPage.do?menuId=MENU0855#main_content
https://www.kead.or.kr/atintrdbsns/cntntsPage.do?menuId=MENU0629
https://www.kead.or.kr/atintrdbsns/cntntsPage.do?menuId=MENU0629#main_content
https://www.kead.or.kr/avsbestprctc/cntntsPage.do?menuId=MENU0887
https://www.kead.or.kr/avsbestprctc/cntntsPage.do?menuId=MENU0887#main_content
https://www.kead.or.kr/avsexclrcmn/cntntsPage.do?menuId=MENU0886
https://www.kead.or.kr/avsexclrcmn/cntntsPage.do?menuId=MENU0886#main_content
https://www.kead.or.kr/avsintrd/cntntsPage.do?menuId=MENU0885
https://www.kead.or.kr/avsintrd/cntntsPage.do?menuId=MENU0885#main_content
https://www.kead.or.kr/basingodesc/cntntsPage.do?menuId=MENU1485
https://www.

### 같은 페이지지만 \#을 통해 Pageindex가 구분되어 중복된 경우를 발견하여 cleaning

In [None]:
import pandas as pd

series=pd.Series(list(subpages))
data = series.map(lambda x: x.split('#')[0])
data.drop_duplicates().reset_index(drop=True).to_json('/workspace/crawled_subpages.json',force_ascii=False,indent=4,index=False)
subpages = data.drop_duplicates().to_list()


## selenium driver가 제대로 설치 되었는지 확인

In [50]:
# vim test.py
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)

driver.get("https://python.org")
print(driver.title)
driver.close()

Welcome to Python.org


## Onclick으로 접근가능한 페이지들 Crawling

In [None]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
import time
from tqdm import tqdm

# ChromeDriver 설정
def setup_driver():
    chrome_options = Options()
    chrome_options.binary_location = "/usr/bin/google-chrome"  # Chrome 경로

    # Chrome 옵션 추가
    chrome_options.add_argument("--headless=new")  # 새로운 헤드리스 모드
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    chrome_options.add_argument("--remote-debugging-port=9222")
    chrome_options.add_argument("--disable-extensions")
    chrome_options.add_argument("--disable-application-cache")
    chrome_options.add_argument("--disable-infobars")
    chrome_options.add_argument("--disable-features=VizDisplayCompositor")
    chrome_options.add_argument("--single-process")

    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=chrome_options)
    return driver

# 트리거 요소 처리 함수
def process_triggers(driver, url, max_retries=3):
    extracted_urls = []
    driver.get(url)
    
    retries = 0
    while retries < max_retries:
        try:
            # 트리거 요소 찾기
            triggers = driver.find_elements(By.XPATH, '//a[@href="javascript:void(0);"][contains(@onclick, "fn_bbs")]')
            print(f"Found {len(triggers)} triggers.")
            
            for trigger in triggers:
                # 클릭 트리거
                driver.execute_script("arguments[0].click();", trigger)
                time.sleep(1)  # 페이지 로딩 대기
                
                # 현재 URL 저장
                current_url = driver.current_url
                if current_url not in extracted_urls:
                    extracted_urls.append(current_url)
                    print(f"URL added: {current_url}")
                
                # 이전 페이지로 돌아가기
                driver.back()
                time.sleep(1)
            
            break  # 성공적으로 실행되면 반복 종료
        
        except Exception as e:
            print(f"Error occurred: {e}. Retrying... ({retries+1}/{max_retries})")
            retries += 1
            time.sleep(2)  # 재시도 대기
            
    if retries == max_retries:
        print("Max retries reached. Exiting.")
    
    return extracted_urls

# 실행 코드
if __name__ == "__main__":
    import json

    with open('/workspace/crawled_subpages.json','rb') as f:
        data = json.load(f)
    subpages = list(data.values())
    
    for start_url in tqdm(subpages):
        
        driver = setup_driver()

        try:
            result_urls = process_triggers(driver, start_url)
            print(f"Extracted URLs: {result_urls}")
            
            # 결과 저장
            with open("/workspace/extracted_urls.txt", "w") as file:
                for url in result_urls:
                    file.write(url + "\n")
        finally:
            driver.quit()


## .do 파일 찾아서 다 저장하기

In [None]:
import os
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse, unquote


def setup_session():
    """
    requests 세션 설정: 재시도와 타임아웃 기본값을 추가.
    """
    session = requests.Session()
    retries = Retry(
        total=5,  # 최대 5회 재시도
        backoff_factor=1,  # 재시도 간격 증가
        status_forcelist=[500, 502, 503, 504],  # 재시도할 HTTP 상태 코드
    )
    adapter = HTTPAdapter(max_retries=retries)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session


def save_failed_url(failed_url, error_message):
    """
    실패한 URL과 에러 메시지를 파일에 저장.
    """
    failed_dir = "workspace"
    failed_file = os.path.join(failed_dir, "failed_files.txt")

    if not os.path.exists(failed_dir):
        os.makedirs(failed_dir)

    with open(failed_file, "a", encoding="utf-8") as file:
        file.write(f"{failed_url} - {error_message}\n")
    print(f"Error logged for: {failed_url}")


def get_file_name_from_response(response, file_url):
    """
    HTTP 응답에서 파일 이름 추출. 깨진 한글 파일명 문제를 해결하기 위해 적절한 디코딩 처리 포함.
    """
    file_name = None
    content_disposition = response.headers.get("Content-Disposition")

    if content_disposition:
        # 헤더에서 파일 이름 추출 (filename* 우선 처리)
        if "filename*" in content_disposition:
            # UTF-8로 인코딩된 파일 이름 추출
            file_name = content_disposition.split("filename*=")[1].split(";")[0].strip()
            if file_name.startswith("UTF-8''"):
                file_name = file_name.replace("UTF-8''", "")
                file_name = unquote(file_name)  # URL 디코딩
        elif "filename=" in content_disposition:
            # 일반 파일 이름 추출
            file_name = content_disposition.split("filename=")[1].split(";")[0].strip('"')
            file_name = unquote(file_name)

    if not file_name:
        # URL에서 기본 파일 이름 추출
        file_name = os.path.basename(urlparse(file_url).path)

    return file_name


def download_file(session, file_url, output_dir):
    """
    주어진 파일 URL을 다운로드하고 저장.
    """
    try:
        print(f"Downloading: {file_url}")
        response = session.get(file_url, timeout=10, stream=True)
        if response.status_code == 200:
            # 파일 이름 추출
            file_name = get_file_name_from_response(response, file_url)

            # 저장 디렉토리 생성
            if not os.path.exists(output_dir):
                os.makedirs(output_dir)
            
            # 파일 저장
            file_path = os.path.join(output_dir,'/workspace/files',file_name)
            with open(file_path, "wb") as file:
                for chunk in response.iter_content(chunk_size=8192):
                    file.write(chunk)
            print(f"File saved: {file_path}")
        else:
            print(f"Failed to download file: {file_url} (status code: {response.status_code})")
            save_failed_url(file_url, f"HTTP {response.status_code}")
    except Exception as e:
        print(f"Error downloading {file_url}: {e}")
        save_failed_url(file_url, str(e))


def crawl_files(url, output_dir="workspace"):
    """
    주어진 URL에서 파일 링크를 파싱하고 다운로드.
    """
    session = setup_session()

    try:
        # 요청 및 페이지 파싱
        response = session.get(url, timeout=10)
        if response.status_code != 200:
            print(f"Failed to fetch the page: {url} (status code: {response.status_code})")
            save_failed_url(url, f"HTTP {response.status_code}")
            return

        soup = BeautifulSoup(response.text, "html.parser")
        file_tags = soup.find_all("a", href=True)  # 모든 링크 태그 검색

        for tag in file_tags:
            href = tag.get("href")
            if not href:
                continue

            # 절대 URL로 변환
            file_url = urljoin(url, href)

            # 특정 다운로드 경로 필터링 (예: downloadDirect.do 포함 여부)
            if "downloadDirect.do" in file_url:
                download_file(session, file_url, output_dir)

    except Exception as e:
        print(f"Error processing {url}: {e}")
        save_failed_url(url, str(e))
        

# 크롤링 실행
for url in subpages:
    crawl_files(url)

## Image Crawling(아직 필요없음)

In [11]:
import os
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse


def setup_session():
    """
    requests 세션 설정: 재시도와 타임아웃 기본값을 추가.
    """
    session = requests.Session()
    retries = Retry(
        total=5,  # 최대 5회 재시도
        backoff_factor=1,  # 재시도 간격 증가
        status_forcelist=[500, 502, 503, 504],  # 재시도할 HTTP 상태 코드
    )
    adapter = HTTPAdapter(max_retries=retries)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session


def save_failed_url(failed_url, error_message):
    """
    실패한 URL과 에러 메시지를 파일에 저장.
    """
    failed_dir = "workspace"
    failed_file = os.path.join(failed_dir, "failed_images.txt")

    if not os.path.exists(failed_dir):
        os.makedirs(failed_dir)

    with open(failed_file, "a", encoding="utf-8") as file:
        file.write(f"{failed_url} - {error_message}\n")
    print(f"Error logged for: {failed_url}")


def crawl_images(url, output_dir="workspace"):
    """
    주어진 URL에서 이미지를 다운로드하여 로컬 디렉토리에 저장하며, 에러 발생 시 기록.
    """
    session = setup_session()

    try:
        # 요청 및 페이지 파싱
        response = session.get(url, timeout=10)
        if response.status_code != 200:
            print(f"Failed to fetch the page: {url} (status code: {response.status_code})")
            save_failed_url(url, f"HTTP {response.status_code}")
            return

        soup = BeautifulSoup(response.text, "html.parser")
        img_tags = soup.find_all("img")  # 이미지 태그 찾기

        for img_tag in img_tags:
            img_url = img_tag.get("src")  # 이미지 URL 추출
            if not img_url:
                continue

            # 절대 URL로 변환
            img_url = urljoin(url, img_url)

            # 이미지 다운로드
            try:
                print(f"Downloading: {img_url}")
                img_response = session.get(img_url, timeout=10)
                if img_response.status_code == 200:
                    # 파일 저장 경로 생성
                    parsed_url = urlparse(url)
                    img_dir = os.path.join(output_dir, parsed_url.netloc, os.path.dirname(urlparse(img_url).path.strip("/")))
                    
                    # 디렉토리 생성
                    if not os.path.exists(img_dir):
                        os.makedirs(img_dir)

                    # 파일 저장
                    img_name = os.path.basename(urlparse(img_url).path)
                    img_path = os.path.join(img_dir, img_name)
                    with open(img_path, "wb") as file:
                        file.write(img_response.content)
                    print(f"Image saved: {img_path}")
                else:
                    print(f"Failed to download image: {img_url} (status code: {img_response.status_code})")
                    save_failed_url(img_url, f"HTTP {img_response.status_code}")
            except Exception as e:
                print(f"Error downloading {img_url}: {e}")
                save_failed_url(img_url, str(e))

    except Exception as e:
        print(f"Error processing {url}: {e}")
        save_failed_url(url, str(e))



# 크롤링 실행
for url in subpages:
    crawl_images(url)


Downloading: https://www.kead.or.kr/assets/image/logo_kead.svg
Image saved: workspace/www.kead.or.kr/assets/image/logo_kead.svg
Downloading: https://www.kead.or.kr/cmm/fms/downloadDirect.do?key=6C0FF029E5EA2F0FF592
Image saved: workspace/www.kead.or.kr/cmm/fms/downloadDirect.do
Downloading: https://www.kead.or.kr/cmm/fms/downloadDirect.do?key=FF4435C9950115A44956
Image saved: workspace/www.kead.or.kr/cmm/fms/downloadDirect.do
Downloading: https://www.kead.or.kr/cmm/fms/downloadDirect.do?key=A341EDE933715F09399E
Image saved: workspace/www.kead.or.kr/cmm/fms/downloadDirect.do
Downloading: https://www.kead.or.kr/cmm/fms/downloadDirect.do?key=37F72646E8C37A838009
Image saved: workspace/www.kead.or.kr/cmm/fms/downloadDirect.do
Downloading: https://www.kead.or.kr/cmm/fms/downloadDirect.do?key=084177A4EFE9687CBA43
Image saved: workspace/www.kead.or.kr/cmm/fms/downloadDirect.do
Downloading: https://www.kead.or.kr/cmm/fms/downloadDirect.do?key=5427B5CAE10B46D3A902
Image saved: workspace/www.kea

## Text Crawling Code

In [None]:
import requests
from bs4 import BeautifulSoup
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from urllib.parse import urlparse
import json
import os

# HTTP Session 설정 (Retry 지원)
session = requests.Session()
retries = Retry(
    total=5,
    backoff_factor=1,
    status_forcelist=[500, 502, 503, 504],
    allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("https://", adapter)
session.mount("http://", adapter)

def crawl_and_collect(urls, failed_log="/workspace/failed_sites.txt"):
    """
    주어진 URL 리스트에서 각각의 페이지 내용을 크롤링하여 수집하며,
    실패한 URL은 별도의 파일에 저장합니다.
    """
    crawled_data = {}
    failed_sites = []

    for url in urls:
        try:
            print(f"Crawling: {url}")
            response = session.get(url, timeout=10)  # 타임아웃 설정
            response.raise_for_status()  # HTTP 상태 코드 확인

            soup = BeautifulSoup(response.text, "html.parser")
            title = soup.title.string if soup.title else "No Title"
            
            # 이미지 URL을 추출하고, img 태그를 텍스트로 삽입
            image_urls = []
            for img in soup.find_all("img"):
                img_src = img.get("src")
                if img_src:
                    # 상대 경로를 절대 경로로 변환
                    full_img_url = img_src if urlparse(img_src).netloc else urlparse(url)._replace(path=img_src).geturl()
                    image_urls.append(full_img_url)
                    
                    # # 이미지 태그를 텍스트로 삽입 (HTML 유지)
                    # img_tag = f"<img src=\"{full_img_url}\">"
                    # img.insert_before(soup.new_string(f"\n{img_tag}\n"))
            
            # body_text: 텍스트를 추출하되, img 태그는 그대로 유지
            body_text = soup.get_text(separator="\n", strip=True)

            # URL별로 데이터 저장
            crawled_data[url] = {
                "title": title,
                "content": body_text,  # 처음 1000자만 저장
                "images": image_urls  # 이미지 URL 목록 추가
            }

        except requests.exceptions.RequestException as e:
            print(f"Request error crawling {url}: {e}")
            failed_sites.append(url)
        except Exception as e:
            print(f"Error crawling {url}: {e}")
            failed_sites.append(url)

    # 실패한 URL을 파일에 기록
    if failed_sites:
        os.makedirs(os.path.dirname(failed_log), exist_ok=True)
        with open(failed_log, "a", encoding="utf-8") as file:
            file.write("\n".join(failed_sites) + "\n")
        print(f"Failed sites have been logged to {failed_log}.")

    return crawled_data



# 크롤링 실행
crawled_results = crawl_and_collect(subpages)

# 결과를 JSON 파일로 저장
output_path = "/workspace/crawled_data.json"
with open(output_path, "w", encoding="utf-8") as file:
    json.dump(crawled_results, file, ensure_ascii=False, indent=4)

print(f"Crawling completed. Results saved to {output_path}.")


Crawling: https://www.kead.or.kr/campus/bbs/grdtnotice/bbsView.do?pageIndex=1&bbsCode=grdtnotice&bbsCnId=202862&bbsNm=%EA%B5%AC%EB%A1%9C%EB%94%94%EC%A7%80%ED%84%B8%ED%9B%88%EB%A0%A8%EC%84%BC%ED%84%B0+%EA%B3%B5%EC%A7%80%EC%82%AC%ED%95%AD&menuId=&adt1Code=&adt2Code=&adt1CodeArr=&searchCondition=sjcn&searchKeyword=&recordCountPerPage=10


In [None]:
from bs4 import BeautifulSoup
import requests

# 대상 URL 설정
url = "https://www.kead.or.kr/campus/spclztrnng/cntntsPage.do?menuId=MENU0964"

# 웹 페이지 가져오기
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')

# <article class="content_width">만 추출
article = soup
if article:
    
    satisfaction_headers = article.find_all('header')
    if satisfaction_headers:
        for satisfaction_header in satisfaction_headers:
            satisfaction_header.decompose()
    # <div class="quick_menu"> 제거
    quick_menu = article.find('div', class_='quick_menu')
    if quick_menu:
        quick_menu.decompose()

    # <footer class="satisfaction"> 제거
    satisfaction_footers = article.find_all('footer')
    if satisfaction_footers:
        for satisfaction_footer in satisfaction_footers:
            satisfaction_footer.decompose()

    all_menu_article = article.find('article', class_='all_menu')
    if all_menu_article:
        all_menu_article.decompose()
    # 결과 HTML 출력 (필요시 저장)
    print(article.get_text(separator="\n", strip=True))
else:
    print("<article class='content_width'> 요소를 찾을 수 없습니다.")


## Data Cleaning Pipeline

### crawling 된 title중복해결.

In [None]:
import json
from pathlib import Path
input_path = Path.cwd()/'workspace'/'crawled_data.json'
output_dir = Path.cwd()/'workspace'/'crawled_text'

with open(input_path,'rb') as f:
    dictionary = json.load(f)

def handle_duplicate_titles(data):
    title_counts = {}
    
    for key, value in data.items():
        original_title = value['title']
        
        # Check if the title has already been seen
        if original_title not in title_counts:
            title_counts[original_title] = 0
            value['title'] = original_title
        else:
            # Increment the count and create a new title
            title_counts[original_title] += 1
            new_title = f"{original_title}_{title_counts[original_title]}"
            value['title'] = new_title
    
    return data


dictionary = handle_duplicate_titles(dictionary)


with open(input_path,'w') as f:
    json.dump(dictionary,f,ensure_ascii=False,indent=4)

