In [1]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import StaleElementReferenceException
from bs4 import BeautifulSoup
import pandas as pd
import pprint
import time
import csv
import re

# 크롬 드라이버 자동 업데이트
from webdriver_manager.chrome import ChromeDriverManager

In [2]:
class RetriesExceededError(Exception):
    def __init__(self, message):
        super().__init__(message)

        
def find_element_with_retry(driver_, by, value, waiting_sec=2.5, max_retries=10):
    retries = 0
    while retries < max_retries:
        try:
            element = WebDriverWait(driver_, waiting_sec).until(EC.presence_of_element_located((by, value)))
            return element
        except (NoSuchElementException, StaleElementReferenceException):
            retries += 1
            
    raise RetriesExceededError(f'{value}를 찾을 수 없음\n재시도 횟수 초과: {waiting_sec}초로 {max_retries}회 시도')

In [3]:
# 브라우저 꺼짐 방지
chrome_options = Options()
chrome_options.add_experimental_option("detach", True)

user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
chrome_options.add_argument(f'user-agent={user_agent}')
# 불필요한 에러 메세지 없애기
chrome_options.add_experimental_option("excludeSwitches", ["enable-logging"])
# driver = webdriver.Chrome('./chromedriver/chromedriver.exe', options=chrome_options)
service = Service(executable_path=ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)

save_file = True  # 파일 저장 여부
waiting_sec = 2.5

In [4]:
# seq 파일 읽기
category = 112751  # 메인보드

seq_file = f'./seq_{category}.csv'
seq_df = pd.read_csv(seq_file)
print(seq_df)

           seq
0     20324882
1      7452181
2     16284179
3     16083650
4     18652871
...        ...
1335  13016483
1336   6732955
1337   6145461
1338   5991461
1339   4465818

[1340 rows x 1 columns]


In [5]:
header = ["name", "price", "link", "company", "product_seq", "image", "cpu_socket", "chipset",
          "form_factor", "memory_type", "memory_number", "memory_capacity", "xmp", "expo",
          "sata3_number", "m2_number", "m2_interface", "m2_formfactor", "pcie_version", "vga_connection",
          "wireless_lan", "wired_lan_speed", "phase", "graphic_output", "back_panel", "io_header",
          "feature", "reg_date", "bookmark"]

header_pci = ["product_seq", "pci_type", "pci_number"]

In [6]:
pv_bitmask = ['PCI', 'PCIe', 'PCIe3.0', 'PCIe4.0', 'PCIe5.0']
m2i_bitmask = ['SATA', 'NVMe', 'PCIe', 'PCIe4.0', 'PCIe5.0']
m2f_bitmask = ['2230', '2242', '2260', '2280', '2580', '22110', '25110']
wl_bitmask = ['무선 LAN', '블루투스', 'M.2 Key-E(모듈별매)']
go_bitmask = ['Type-C', 'HDMI', 'DP', 'DVI', 'D-SUB']
back_panels = ['USB4', 'USB 3.2', 'USB 3.1', 'USB 3.0', 'USB 2.0', '썬더볼트4',
               '썬더볼트3', 'RJ-45', 'S/PDIF', 'Type-C(오디오)', '오디오잭', 'PS/2',
               'e-SATA', '시리얼포트', '패러럴포트', 'BIOS 플래시백', '클리어CMOS']
ioh_bitmask = ['썬더볼트4 헤더', '썬더볼트3 헤더', 'USB4 헤더', 'USB 3.1 헤더', 'USB 3.0 헤더',
              'USB 2.0 헤더', 'USB3.2 Type C 헤더', 'USB3.1 Type C 헤더', 'USB3.0 Type C 헤더',
              '시스탬팬 6핀 헤더', 'RGB 12V 4핀 헤더', 'ARGB 5V 3핀 헤더', 'ARGB 6핀 헤더', 'TPM 헤더']
feat_bitmask = ['전원부 방열판', 'DrMOS', 'SPS(DrMOS)', 'M.2 히트싱크', 'LED 라이트', '일체형IO실드',
                'UEFI', '인텔 TBMT 3.0 지원', 'AMD APU 지원']

pcis = ['PCIex16', 'PCIex16(at x8)', 'PCIex16(at x4)', 'PCIex16(at x2)', 'PCIex16(at x1)', 'PCIex8',
         'PCIex8(at x4)', 'PCIex4', 'PCIex1', 'PCI', 'mini-PCIe(mPCIe)']

In [7]:
mainboard = dict()
mainboard_pci = dict()

for col in header:
    mainboard[col] = []
for col in header_pci:
    mainboard_pci[col] = []

In [8]:
ntime = 0
total = len(seq_df)
start = time.time()

In [9]:
# 크롤링 시작
for seq in seq_df.seq[ntime:]:
    url = f'https://prod.danawa.com/info/?pcode={seq}&cate={category}'
    driver.get(url)
    driver.implicitly_wait(10)
    
    time.sleep(waiting_sec)
    soup = BeautifulSoup(driver.page_source, 'html.parser')
    
    spec_tbl = soup.select("#productDescriptionArea > div > div.prod_spec > table > tbody")
    spec_tbl_tit = soup.select("#productDescriptionArea > div > div.prod_spec > table > tbody > tr > th.tit")
    spec_tbl_dsc = soup.select("#productDescriptionArea > div > div.prod_spec > table > tbody > tr > td.dsc")
    name = soup.select_one("#blog_content > div.summary_info > div.top_summary > h3 > span")
    price = soup.select_one("#blog_content > div.summary_info > div.detail_summary > div.summary_left > div.lowest_area > div.lowest_top > div.row.lowest_price > span.lwst_prc > a > em")
    image_url = soup.select_one("#baseImage")
    
    tbl = zip(spec_tbl_tit, spec_tbl_dsc)

    info = dict()
    info['제품명'] = name.get_text()
    info['최저가'] = price.get_text() if price is not None else None
    info['링크'] = url
    info['이미지'] = image_url['src']
    for tit, dsc in tbl:
        text = dsc.get_text()
        text = text.replace('\n','')
        text = text.replace('\t','')
        text = text.replace('(제조사 웹사이트 바로가기)','')
        info[tit.get_text()] = text.strip()
    
    pv_bit = 0
    m2i_bit = 0
    m2f_bit = 0
    wl_bit = 0
    wls = 0
    go_bit = 0
    bp_text = ''
    ioh_bit = 0
    feat_bit = 0

    # PCIe 버전 정보 가져오기
    pv_info = []
    pv_th = soup.find('th', string='PCIe버전')
    if pv_th is not None:
        pv_tr = pv_th.find_parent('tr')
        while True:
            pv_tr = pv_tr.find_next_sibling()
            pv_tit = pv_tr.find_all('th', class_='tit')
            if pv_tit:
                for tit in pv_tit:
                    if tit.get_text() == '': continue
                    pv_info.append(tit.get_text())
            else:
                break
            
    for i in range(len(pv_bitmask)):
        if pv_bitmask[i] in pv_info: pv_bit |= 1 << i

    # M.2 연결 정보 가져오기
    m2i_info = []
    m2i_th = soup.find('th', string='M.2 연결')
    if m2i_th is not None:
        m2i_tr = m2i_th.find_parent('tr')
        while True:
            m2i_tr = m2i_tr.find_next_sibling()
            m2i_tit = m2i_tr.find_all('th', class_='tit')
            if m2i_tit:
                for tit in m2i_tit:
                    if tit.get_text() == '': continue
                    m2i_info.append(tit.get_text())
            else:
                break
            
    for i in range(len(m2i_bitmask)):
        if m2i_bitmask[i] in m2i_info: m2i_bit |= 1 << i

    if len(mainboard['name']) >= ntime:
        for key in mainboard.keys(): mainboard[key] = mainboard[key][:ntime]
    mainboard['name'].append(name.get_text())
    mainboard['price'].append(price.get_text().replace(',', '') if price is not None else None)
    mainboard['link'].append(url)
    mainboard['company'].append(info.get('제조회사'))
    mainboard['product_seq'].append(seq)
    mainboard['image'].append(image_url['src'])
    mainboard['cpu_socket'].append(info.get('CPU 소켓'))
    mainboard['chipset'].append(info.get('세부 칩셋'))
    mainboard['form_factor'].append(info.get('폼팩터'))
    mainboard['memory_type'].append(info.get('메모리 종류'))
    mainboard['memory_number'].append(info.get('메모리 슬롯').replace('개', '') if '메모리 슬롯' in info.keys() else None)
    mc_str = info.get('메모리 용량')
    if mc_str is not None:
        mc_str = re.sub('(최대|GB|내장| )', '', mc_str)
        if 'TB' in mc_str:
            mc_str = str(float(mc_str.replace('TB', '').replace('(LR-DIMM3DS)', '')) * 1024)
    mainboard['memory_capacity'].append(mc_str)
    mainboard['xmp'].append(1 if 'XMP' in info.keys() else 2 if 'XMP3.0' in info.keys() else None)
    mainboard['expo'].append(1 if 'EXPO' in info.keys() else None)
    mainboard['sata3_number'].append(info.get('SATA3').replace('개', '') if 'SATA3' in info.keys() else None)
    mainboard['m2_number'].append(info.get('M.2').replace('개', '') if 'M.2' in info.keys() else None)
    mainboard['m2_interface'].append(m2i_bit)
    for i in range(len(m2f_bitmask)):
        if m2f_bitmask[i] in info.keys(): m2f_bit |= 1 << i
    mainboard['m2_formfactor'].append(m2f_bit)
    mainboard['pcie_version'].append(pv_bit)
    mainboard['vga_connection'].append(info.get('VGA 연결'))
    for i in range(len(wl_bitmask)):
        if wl_bitmask[i] in info.keys(): wl_bit |= 1 << i
    mainboard['wireless_lan'].append(wl_bit)
    wls_str = info.get('유선랜 속도')
    if wls_str is not None:
        if wls_str == '100메가':
            wls_str = '0.1'
        elif wls_str == '기가비트':
            wls_str = '1'
        wls_str = wls_str.replace('기가비트', '')
        if ',' in wls_str: wls_str = wls_str[:wls_str.index(',')]
        wls = float(wls_str) * 10
    mainboard['wired_lan_speed'].append(wls)
    mainboard['phase'].append(eval(info.get('전원부').replace('+페이즈', '').replace('페이즈', ''))
                                   if '전원부' in info.keys() else None)
    for i in range(len(go_bitmask)):
        if go_bitmask[i] in info.keys(): go_bit |= 1 << i
    mainboard['graphic_output'].append(go_bit)
    for panel in back_panels:
        if panel in info.keys(): bp_text += f'{panel} / '
    bp_text = bp_text[:-3]
    mainboard['back_panel'].append(bp_text)
    for i in range(len(ioh_bitmask)):
        if ioh_bitmask[i] in info.keys(): ioh_bit |= 1 << i
    mainboard['io_header'].append(ioh_bit)
    for i in range(len(feat_bitmask)):
        if feat_bitmask[i] in info.keys(): feat_bit |= 1 << i
    mainboard['feature'].append(feat_bit)
    mainboard['reg_date'].append(re.sub('(년| |월)', '', info.get('등록년월')))
    mainboard['bookmark'].append(None)
    
    ps_th = soup.find('th', string='PCIe슬롯')
    if ps_th is not None:
        ps_tr = ps_th.find_parent('tr')
        while True:
            ps_tr = ps_tr.find_next_sibling()
            ps_tit = ps_tr.find_all('th', class_='tit')
            ps_dsc = ps_tr.find_all('td', class_='dsc')
            if ps_tit:
                for tit, dsc in zip(ps_tit, ps_dsc):
                    if not tit.get_text() in pcis: continue
                    mainboard_pci['product_seq'].append(seq)
                    mainboard_pci['pci_type'].append(tit.get_text())
                    mainboard_pci['pci_number'].append(dsc.get_text().replace('개', ''))
            else:
                break
    
    ## 테스트용
    ntime += 1
    print(f'\r{ntime}/{total} ({ntime/total*100:.2f}%)', end='')
    
end = time.time()
print(f'\n걸린 시간: {end-start:.2f} sec')
driver.quit()
# 크롤링 끝

1340/1340 (100.00%)
걸린 시간: 6812.02 sec


In [10]:
# 파일 저장
if save_file:
    today = time.strftime('%y%m%d')
    pd.DataFrame(mainboard).to_csv(f'mainboard{today}.csv', index=None)
    pd.DataFrame(mainboard_pci).to_csv(f'mainboard_pci{today}.csv', index=None)