In [None]:
%pip install beautifulsoup4 requests pandas transformers torch --user

In [34]:
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
import re
import pandas as pd
import csv
from collections import Counter
import torch
from transformers import BertTokenizer, BertModel
import json
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
import time

In [None]:
# 코스피, 코스닥 종목명 및 코드 추출
def extract_stocks_name():
    df_kospi = pd.read_excel('kospi_code.xlsx', dtype={'단축코드': str})
    df_kosdaq = pd.read_excel('kosdaq_code.xlsx', dtype={'단축코드': str})

    filterd_df_kospi = df_kospi[df_kospi['그룹코드'] == 'ST']
    filterd_df_kosdaq = df_kosdaq[df_kosdaq['증권그룹구분코드'] == 'ST']

    result_df_kospi = filterd_df_kospi[['한글명', '단축코드']].copy()
    result_df_kosdaq = filterd_df_kosdaq[['한글종목명', '단축코드']].copy()
    
    
    result_df_kospi.rename(columns={'한글명': '종목명'}, inplace=True)
    result_df_kosdaq.rename(columns={'한글종목명': '종목명'}, inplace=True)

    result_df_kospi['단축코드'] = result_df_kospi['단축코드'].apply(lambda x: x.zfill(6))
    result_df_kosdaq['단축코드'] = result_df_kosdaq['단축코드'].apply(lambda x: x.zfill(6))

    result_df_kospi.to_csv('../data/kospi_stocks.csv', index=False, encoding='utf-8-sig', quoting=csv.QUOTE_ALL)
    result_df_kosdaq.to_csv('../data/kosdaq_stocks.csv', index=False, encoding='utf-8-sig')
    print("CSV 파일이 성공적으로 저장되었습니다: kospi_stocks.csv, kosdaq_stocks.csv")

    #df = pd.read_csv('kosdaq_stocks.csv')
    #print(df['단축코드'].head(10))
    
extract_stocks_name()
    

In [52]:
def get_clien_posts(base_url):
    posts = []
    headers = {'User-Agent': 'Mozilla/5.0'}
    today = datetime.now()
    date_start = (today - timedelta(days=1)).replace(hour=0, minute=0, second=0)
    date_end = (today - timedelta(days=1)).replace(hour=23, minute=59, second=59)
    
    page = 0
    while True:
        url = f"{base_url}?&od=T31&category=0&po={page}"
        response = requests.get(url, headers=headers)

        if response.status_code != 200:
            print(f"Failed to fetch page {page}: {response.status_code}")
            break
        
        soup = BeautifulSoup(response.text, 'html.parser')
        articles = soup.select('div.list_item.symph_row')
        
        if not articles:
            print("[Clien] No more articles found.")
            break
        
        for article in articles:
            # Extract title and link
            title_element = article.select_one('span.subject_fixed')
            date_element = article.select_one('span.timestamp')
            link_element = article.select_one('a.list_subject')
            
            if not title_element or not date_element or not link_element:
                continue
            
            title = title_element.get_text(strip=True)
            date_str = date_element.get_text(strip=True)
            link = f"https://www.clien.net{link_element['href']}"
            
            # Convert date to datetime object
            post_date = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")

            # Stop if the post date is before range
            if post_date < date_start:
                print("[Clien] No more posts from yesterday.")
                return posts

            if not (date_start <= post_date <= date_end):
                continue
            
            # Crawling post content
            post_response = requests.get(link, headers=headers)
            if post_response.status_code != 200:
                print(f"[Clien] Failed to fetch post: {link}")
                continue
            
            post_soup = BeautifulSoup(post_response.text, 'html.parser')
            content_element = post_soup.select_one('div.post_article')
            content = content_element.get_text(strip=True) if content_element else "No content"
            
            posts.append({
                'title': title,
                'date': post_date.strftime("%Y-%m-%d %H:%M"),
                'text': content,
                'link' : link
            })
        page += 1

    return posts


In [56]:
def get_fmkorea_posts(base_url):
    posts = []
    
    options = Options()
    options.add_argument('--headless')
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=options)
    
    today = datetime.now()
    yesterday = today - timedelta(days=1)
    date_start = (today - timedelta(days=1)).replace(hour=0, minute=0, second=0)
    date_end = (today - timedelta(days=1)).replace(hour=23, minute=59, second=59)
    
    page = 1
    while True:
        url = f"{base_url}&page={page}"
        driver.get(url)
        driver.implicitly_wait(60) 
        
        articles = driver.find_elements(By.CSS_SELECTOR, '.bd_lst.bd_tb_lst.bd_tb .title a')
        stop_crawling = False
        
        if not articles:
            print("[FMKorea] No more articles found.")
            break
        
        for article in articles:
            if article.get_attribute('title') == "댓글":
                continue
                
            parent_tr = article.find_element(By.XPATH, "./../..")  # article의 부모 tr 태그를 찾기
            date_element = parent_tr.find_element(By.CSS_SELECTOR, ".time")
            date_str = date_element.text.strip()
                
            try:
                if ":" in date_str: # HH:MM 형태 시간 계산
                    post_time = datetime.strptime(date_str, "%H:%M").time()
                    post_date = datetime.combine(today.date(), post_time)
                    if post_date > today:  # HH:MM이 오늘 이후라면 어제로 조정
                        post_date = datetime.combine(yesterday.date(), post_time)
                else:
                    post_date = datetime.strptime(date_str, "%Y.%m.%d %H:%M")
            except Exception as e:
                print(f"[FMKorea] Date parsing failed for {date_str}: {e}")
                continue
                
            # 첫번째 어제자 게시물을 찾으면, 본문 크롤링만 진행
            if date_start <= post_date <= date_end:
                print("[FMKorea] Found first yesterday post, starting detailed crawls...")
                article.click()
                time.sleep(2)
                
                while True:
                    title = driver.find_element(By.CSS_SELECTOR, 'h1.np_18px').text.strip()
                    url = driver.current_url
                    time_element = driver.find_element(By.CSS_SELECTOR, '.date')
                    time_str = time_element.text.strip()
                    date = datetime.strptime(time_str, "%Y.%m.%d %H:%M")
                    
                    if date < date_start:
                        stop_crawling = True
                        break
                    
                    content_element = driver.find_element(By.CSS_SELECTOR, '.rd_body article')
                    content = content_element.text.strip() if content_element else "No content"
                    
                    posts.append({
                        'title': title,
                        'date': date.strftime("%Y-%m-%d %H:%M"),
                        'text': content,
                        'link': url
                    })
                    
                    # 다음 게시글로 넘어가도록 버튼 클릭
                    next_button = driver.find_element(By.CSS_SELECTOR, 'div.prev_next_btns span.btn_pack.next.blockfmcopy')
                    driver.execute_script("arguments[0].scrollIntoView();", next_button)
                    next_button.click()
                    time.sleep(0.5)
                
                if stop_crawling:
                    break
                
        if stop_crawling:
            print("[FMKorea] Crawling stopped. All posts from yesterday have been fetched.")
            break
        
        page += 1
    
    driver.quit()
    return posts

In [64]:
# Load stopword.txt 
def load_stopwords(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        stopwords = set(line.strip() for line in f if line.strip())
    return stopwords

# Load csv file 
def load_stock_names():
    df_kospi = pd.read_csv('../data/raw_data/kospi_stocks.csv')  # KOSPI 종목명 파일
    df_kosdaq = pd.read_csv('../data/raw_data/kosdaq_stocks.csv')  # KOSDAQ 종목명 파일
    
    # 종목명 컬럼 병합
    stock_names = pd.concat([df_kospi['종목명'], df_kosdaq['종목명']])
    stock_names = stock_names.dropna().str.strip().tolist()
    stock_names.sort(key=len, reverse=True)
    
    return stock_names

# Load alias json (대체어 저장 파일)
def load_stock_aliases(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        alias_dict = json.load(f)
    
    return alias_dict

# Match stock name with alias json 
def match_stock_names_with_aliases(text, stock_names, stock_aliases):
    matched_stocks = []
    alias_to_main = {}  # Alias → 대표 종목명 매핑

    # Alias 사전 구성
    for main_stock, aliases in stock_aliases.items():
        alias_to_main[main_stock] = main_stock  # 대표 종목명도 매핑
        for alias in aliases:
            alias_to_main[alias] = main_stock   # Alias → 대표 종목명 매핑

    extended_stock_names = set(alias_to_main.keys())

    # 긴 이름부터 매칭
    for stock in sorted(extended_stock_names, key=len, reverse=True):
        if stock in text:
            matched_stocks.append(alias_to_main[stock])  # 대표 종목명으로 저장
            text = text.replace(stock, '')               # 매칭된 종목명 제거

    return matched_stocks, text

# Extract keywords to use KoBERT
def extract_keywords_with_kobert(text, tokenizer, model):
    # 불용어 리스트 생성
    STOPWORDS_FILE = '../data/stopwords.txt'
    STOPWORDS = load_stopwords(STOPWORDS_FILE)
    
    keywords = []
    
    split_texts = [text[i:i+512] for i in range(0, len(text), 512)]
    
    for chunk in split_texts:
        # KoBERT 토큰화
        inputs = tokenizer(chunk, return_tensors='pt', truncation=True, padding='max_length', max_length=512)
        with torch.no_grad():
            outputs = model(**inputs)
        
        # 토큰 ID 배열 -> 토큰으로 변환
        input_ids = inputs['input_ids'][0]
        tokens = tokenizer.convert_ids_to_tokens(input_ids)
        
        # 토큰 재결합 (서브워드 합치기)
        combined_tokens = []
        temp_token = ''
        for token in tokens:
            if token.startswith('##'):
                temp_token += token[2:]  # ## 제거 후 이어붙이기
            else:
                if temp_token:
                    combined_tokens.append(temp_token)
                    temp_token = ''
                if len(token) > 1 and re.match(r'[가-힣a-zA-Z]+', token):
                    combined_tokens.append(token)
        
        # 마지막 남은 토큰 추가
        if temp_token:
            combined_tokens.append(temp_token)
        
        # STOPWORDS 필터링
        keywords.extend([token for token in combined_tokens if token not in STOPWORDS])
    
    return keywords

# 메인 기능 함수
def extract_stock_keywords(posts):
    stock_names = load_stock_names()
    stock_aliases = load_stock_aliases('../data/raw_data/stock_alias.json')
    
    # Load KoBERT model, tokenizer
    tokenizer = BertTokenizer.from_pretrained('monologg/kobert')
    model = BertModel.from_pretrained('monologg/kobert')
    
    results = []
    
    for post in posts:
        text = str(post['title']) + " " + str(post['text'])
        text = re.sub(r'[^가-힣a-zA-Z0-9\s]', '', text) # 특수 문자 제거 및 전처리
        
        # 1. 종목명 및 Alias 매칭
        matched_stocks, text = match_stock_names_with_aliases(text, stock_names, stock_aliases)
        # 2. KoBERT 키워드 추출 및 재결합
        kobert_keywords = extract_keywords_with_kobert(text, tokenizer, model)
        
        keyword_counts = Counter(matched_stocks + kobert_keywords)
        keyword_counts_dict = dict(sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)) # 내림차순 정렬
        
        results.append({
            'title': post['title'],
            'date': post['date'],
            'text': post['text'],
            'link': post['link'],
            'keywords': keyword_counts_dict
        })
    
    return pd.DataFrame(results)
    

In [65]:
if __name__ == "__main__":
    combined_df = pd.read_csv("../data/raw_data/community_crawling_data.csv")
    posts = []
    for _, row in combined_df.iterrows():
        posts.append({
            'title': row['title'],
            'date': row['date'],
            'text': row['text'],
            'link': row['link'],
        })
    
    raw_data_df = extract_stock_keywords(posts)
    raw_data_df.to_csv('../data/raw_data/community_raw_data.csv', index=False, encoding='utf-8-sig')

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'KoBertTokenizer'. 
The class this function is called from is 'BertTokenizer'.


In [None]:
if __name__ == "__main__":
    clien_url = "https://www.clien.net/service/board/cm_stock"
    fm_url = "https://www.fmkorea.com/index.php?mid=stock&category=2997203870"
    
    clien_posts = get_clien_posts(clien_url)
    fm_posts = get_fmkorea_posts(fm_url)
    
    crawling_df = pd.DataFrame(clien_posts + fm_posts)
    crawling_df.to_csv('../data/raw_data/community_crawling_data.csv', index=False, encoding='utf-8-sig')
    
    clien_raw_data_df = extract_stock_keywords(clien_posts)
    fm_raw_data_df = extract_stock_keywords(fm_posts)

    combined_df = pd.concat([clien_raw_data_df, fm_raw_data_df], ignore_index=True)
    combined_df.to_csv('../data/raw_data/community_raw_data.csv', index=False, encoding='utf-8-sig')