In [None]:
# Library
import tensorflow as tf
import torch

from transformers import BertTokenizer
from transformers import BertForSequenceClassification, AdamW, BertConfig
from transformers import get_linear_schedule_with_warmup
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split, GridSearchCV

import pandas as pd
import numpy as np
import random
import time
import datetime
from konlpy.tag import Komoran

In [None]:
# Komoran 형태소 분석기 초기화
komoran = Komoran()

# GPU 확인
n_devices = torch.cuda.device_count()
print(n_devices)

for i in range(n_devices):
    print(torch.cuda.get_device_name(i))

In [None]:
# 데이터프레임 로드
news_data = pd.read_excel('/content/drive/MyDrive/data/filtered_samsung_news_with_outcome.xlsx')

# Outcome 레이블 변경
news_data.loc[(news_data['Outcome'] == '호재'), 'Outcome'] = 0
news_data.loc[(news_data['Outcome'] == '악재'), 'Outcome'] = 1

# 데이터셋 나누기
news_data_shuffled = news_data.sample(frac=1).reset_index(drop=True)
train = news_data_shuffled[:10000]
test = news_data_shuffled[10000:]

In [None]:
# 형태소 분석기를 이용한 전처리
def preprocess_text(text):
    tokens = komoran.morphs(text)  # 형태소 분석기 사용
    return " ".join(tokens)

In [None]:
# Train 데이터 전처리
sentences_train = ["[CLS] " + preprocess_text(str(s)) + " [SEP]" for s in train.content]

# label 추출
labels = train['Outcome'].values

In [None]:
# 토크나이저 초기화 및 토큰화
tokenizer = BertTokenizer.from_pretrained('beomi/kcbert-base', do_lower_case=False)
tokenized_texts = [tokenizer.tokenize(s) for s in sentences_train]

In [None]:
# 시퀀스 길이 설정 및 패딩
MAX_LEN = 128
input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]
input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")

In [None]:
# 어텐션 마스크 생성
attention_masks = [[float(i>0) for i in seq] for seq in input_ids]

In [None]:
# 데이터셋 분할
train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(input_ids, labels, random_state=2000, test_size=0.1)
train_masks, validation_masks, _, _ = train_test_split(attention_masks, input_ids, random_state=2000, test_size=0.1)

In [None]:
# 텐서 변환
train_inputs = torch.tensor(train_inputs)
train_labels = torch.tensor(train_labels.astype(int))
train_masks = torch.tensor(train_masks)
validation_inputs = torch.tensor(validation_inputs)
validation_labels = torch.tensor(validation_labels.astype(int))
validation_masks = torch.tensor(validation_masks)

In [None]:
# 배치 사이즈 설정 및 데이터 로더 생성
batch_size = 32
train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

In [None]:
# GridSearchCV를 사용하기 위해 파라미터 설정
param_grid = {
    'epochs': [3, 4],  # 에포크 수
    'learning_rate': [2e-5, 3e-5],  # 학습률
}

In [None]:
# 모델과 옵티마이저 설정
def create_model(learning_rate=2e-5, num_labels=2):
    model = BertForSequenceClassification.from_pretrained('beomi/kcbert-base', num_labels=num_labels)
    optimizer = AdamW(model.parameters(), lr=learning_rate, eps=1e-8)
    return model, optimizer

In [None]:
# 수동 GridSearch
best_accuracy = 0
best_params = None
for epochs in param_grid['epochs']:
    for learning_rate in param_grid['learning_rate']:
        model, optimizer = create_model(learning_rate=learning_rate)
        model.cuda()

        total_steps = len(train_dataloader) * epochs
        scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

        # 학습 루프
        for epoch_i in range(epochs):
            model.train()
            total_loss = 0
            for step, batch in enumerate(train_dataloader):
                batch = tuple(t.to(device) for t in batch)
                b_input_ids, b_input_mask, b_labels = batch

                outputs = model(b_input_ids, attention_mask=b_input_mask, labels=b_labels)
                loss = outputs.loss
                total_loss += loss.item()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
                optimizer.step()
                scheduler.step()
                model.zero_grad()

            # 검증
            model.eval()
            eval_accuracy = 0
            for batch in validation_dataloader:
                batch = tuple(t.to(device) for t in batch)
                b_input_ids, b_input_mask, b_labels = batch
                with torch.no_grad():
                    outputs = model(b_input_ids, attention_mask=b_input_mask)
                logits = outputs.logits
                logits = logits.detach().cpu().numpy()
                label_ids = b_labels.to('cpu').numpy()
                tmp_eval_accuracy = flat_accuracy(logits, label_ids)
                eval_accuracy += tmp_eval_accuracy

            avg_eval_accuracy = eval_accuracy / len(validation_dataloader)

            # 최적의 성능을 기록
            if avg_eval_accuracy > best_accuracy:
                best_accuracy = avg_eval_accuracy
                best_params = {'epochs': epochs, 'learning_rate': learning_rate}

In [None]:
# Best parameter and Accuracy
print(f"Best Parameters: {best_params}")
print(f"Best Validation Accuracy: {best_accuracy:.4f}")

In [None]:
# 최적 파라미터로 최종 모델 학습
model, optimizer = create_model(learning_rate=best_params['learning_rate'])
model.cuda()

In [None]:
# 위와 동일하게 Test data도 preprocessing
# [CLS] + 문장 + [SEP]
sentences = ["[CLS] " + str(sentence) + " [SEP]" for sentence in test.content]

# 라벨 데이터
labels = test['Outcome'].values

# Word 토크나이저 토큰화
tokenizer = BertTokenizer.from_pretrained('beomi/kcbert-base', do_lower_case=False)
tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]

# 시퀀스 설정 및 정수 인덱스 변환 & 패딩
MAX_LEN = 128
input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]
input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")

# 어텐션 마스크
attention_masks = []
for seq in input_ids:
    seq_mask = [float(i>0) for i in seq]
    attention_masks.append(seq_mask)

# 파이토치 텐서로 변환
test_inputs = torch.tensor(input_ids)
test_labels = torch.tensor(labels.astype(int))
test_masks = torch.tensor(attention_masks)

# 배치 사이즈 설정 및 데이터 설정
batch_size = 32
test_data = TensorDataset(test_inputs, test_masks, test_labels)
test_sampler = RandomSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)

In [None]:
# Use GPU or CPU
if torch.cuda.is_available():
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print('No GPU available, using the CPU instead.')

In [None]:
# Load Kc-Bert Model
model = BertForSequenceClassification.from_pretrained('beomi/kcbert-base', num_labels=2)
model.cuda()

In [None]:
# Optimizer
optimizer = AdamW(model.parameters(),
                  lr = 2e-5, # 학습률(learning rate)
                  eps = 1e-8
                )

# Epochs
epochs = 4

# Total train step
total_steps = len(train_dataloader) * epochs

# Create Scheduler
scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps= 0,
                                            num_training_steps = total_steps)

In [None]:
# Learning Model
# Accuracy (정확도)
def flat_accuracy(preds, labels) :
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels.flat) / len(labels_flat)

# Time
def format_time(elapsed) :
    # 반올림
    elapsed_rounded = int(round((elapsed)))
    # hh:mm:ss
    return str(datetime.timedelta(seconds=elapsed_rounded))

In [None]:
# Set Random Seed
seed_val = 42
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

# Gradient (기울기)
model.zero_grad()

# Training
for epoch_i in range(0, epochs) :
    print()
    print(f' Epoch {epoch_i+1} / {epochs}')

    t0 = time.time()
    total_loss = 0
    model.train()

    # DataLoader에서 batch만큼 반복
    for step, batch in enumerate(train_dataloader) :
        if step % 500 == 0 and not step == 0 :
            elapsed = format_time(time.time() - 10)

        batch = tuple(t.to(device) for t in batch)

        # 데이터 추출
        b_input_ids, b_input_mask, b_labels = batch

        # Forward
        outputs = model(b_input_ids,
                        token_type_ids = None,
                        attention_mask = b_input_mask,
                        labels=b_labels)

        loss = outputs[0]
        total_loss += loss.item()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()
        model.zero_grad()

    avg_train_loss = total_loss / len(train_dataloader)

    #시작 시간 설정
    t0 = time.time()

    # 평가모드로 변경
    model.eval()

    # 변수 초기화
    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0

    # 데이터로더에서 배치만큼 반복하여 가져옴
    for batch in validation_dataloader:
        # 배치를 GPU에 넣음
        batch = tuple(t.to(device) for t in batch)

        # 배치에서 데이터 추출
        b_input_ids, b_input_mask, b_labels = batch

        # 그래디언트 계산 안함
        with torch.no_grad():
            # Forward 수행
            outputs = model(b_input_ids,
                            token_type_ids=None,
                            attention_mask=b_input_mask)

        # 로스 구함
        logits = outputs[0]

        # CPU로 데이터 이동
        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()

        # 출력 로짓과 라벨을 비교하여 정확도 계산
        tmp_eval_accuracy = flat_accuracy(logits, label_ids)
        eval_accuracy += tmp_eval_accuracy
        nb_eval_steps += 1

    print("  Accuracy: {0:.2f}".format(eval_accuracy/nb_eval_steps))
    print("  Validation took: {:}".format(format_time(time.time() - t0)))

print("")
print("Training complete!")

In [None]:
# 평가
#시작 시간 설정
t0 = time.time()

# 평가모드로 변경
model.eval()

# 변수 초기화
eval_loss, eval_accuracy = 0, 0
nb_eval_steps, nb_eval_examples = 0, 0

# 데이터로더에서 배치만큼 반복하여 가져옴
for step, batch in enumerate(test_dataloader):
    # 경과 정보 표시
    if step % 100 == 0 and not step == 0:
        elapsed = format_time(time.time() - t0)
        print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(test_dataloader), elapsed))

    # 배치를 GPU에 넣음
    batch = tuple(t.to(device) for t in batch)

    # 배치에서 데이터 추출
    b_input_ids, b_input_mask, b_labels = batch

    # 그래디언트 계산 안함
    with torch.no_grad():
        # Forward 수행
        outputs = model(b_input_ids,
                        token_type_ids=None,
                        attention_mask=b_input_mask)

    # 로스 구함
    logits = outputs[0]

    # CPU로 데이터 이동
    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()

    # 출력 로짓과 라벨을 비교하여 정확도 계산
    tmp_eval_accuracy = flat_accuracy(logits, label_ids)
    eval_accuracy += tmp_eval_accuracy
    nb_eval_steps += 1

print("")
print("Accuracy: {0:.2f}".format(eval_accuracy/nb_eval_steps))
print("Test took: {:}".format(format_time(time.time() - t0)))

In [None]:
# 테스트
# 입력 데이터 변환
def convert_input_data(sentences):

    # BERT의 토크나이저로 문장을 토큰으로 분리
    tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]

    # 입력 토큰의 최대 시퀀스 길이
    MAX_LEN = 128

    # 토큰을 숫자 인덱스로 변환
    input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]

    # 문장을 MAX_LEN 길이에 맞게 자르고, 모자란 부분을 패딩 0으로 채움
    input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")

    # 어텐션 마스크 초기화
    attention_masks = []

    # 어텐션 마스크를 패딩이 아니면 1, 패딩이면 0으로 설정
    # 패딩 부분은 BERT 모델에서 어텐션을 수행하지 않아 속도 향상
    for seq in input_ids:
        seq_mask = [float(i>0) for i in seq]
        attention_masks.append(seq_mask)

    # 데이터를 파이토치의 텐서로 변환
    inputs = torch.tensor(input_ids)
    masks = torch.tensor(attention_masks)

    return inputs, masks

# 문장 테스트
def test_sentences(sentences):

    # 평가모드로 변경
    model.eval()

    # 문장을 입력 데이터로 변환
    inputs, masks = convert_input_data(sentences)

    # 데이터를 GPU에 넣음
    b_input_ids = inputs.to(device)
    b_input_mask = masks.to(device)

    # 그래디언트 계산 안함
    with torch.no_grad():
        # Forward 수행
        outputs = model(b_input_ids,
                        token_type_ids=None,
                        attention_mask=b_input_mask)

    # 로스 구함
    logits = outputs[0]

    # CPU로 데이터 이동
    logits = logits.detach().cpu().numpy()

    return logits

In [None]:
logits = test_sentences(['''
코스피 대장주 삼성전자가 개별 호재에도 불구, 시장 전반에 드리운 악재를 이기지 못하고 20일 만에 8만원선이 붕괴됐다. 이스라엘과 이란간 군사충돌, 이에 따른 안전자산 선호로 달러가 강세를 보이는데다 전일 미 연방준비제도(Fed) 파월 의장의 매파적 발언도 시장에 찬물을 끼얹는 분위기다.

17일 한국거래소에 따르면, 삼성전자 주가는 전 거래일 대비 1100원(-1.38%) 떨어진 7만8900원에 마감, 종가기준 3월 28일 8만800원을 기록해 이른바 ‘8만전자’입성 20일 만에 다시 7만원 대로 무너졌다. 그 여파로 코스피도 지난 2월 6일 종가 2576.20을 기록한 이후 2달여 만에 다시 2600선 아래로 후퇴했다.

지난해 부진을 면치 못했던 삼성전자는 최근 다시 수출이 기지개를 켜며 반등을 이어가는 분위기였다. 실적 개선과 더불어 현지시간 15일 미 상무부가 삼성전자에 대한 보조금으로 64억 달러(약 9조원) 지급을 결정하는 등 호재가 이어지는 상황이었다.

다만 이와는 별개로 현지시간 13일 이란이 이스라엘에 군사공격을 감행해 언제 반격이 나올지 모르는 분위기다. 여기에 미 3월 소비자물가지수가 예상을 뛰어넘자 기준금리 인하 시기가 하반기로 이연되는 흐름에 따른 강달러 기조로 원/달러 환율이 16일 1400원을 터치하기도 했다. 다만 17일 환율은 전 거래일 대비 7.70원 하락한 1386.80원을 기록했다.

여기에 기준금리 하락 기대에 선을 긋기라도 하듯 제롬 파월 미 연준 의장이 현지시간 16일 워싱턴DC에서 열린 한 포럼에서 “높은 인플레이션이 지속된다면 현재의 긴축적인 통화정책 수준을 필요한 만큼 길게 유지할 수 있다”며 매파적인 발언을 남겼다.

하나증권 김록호 연구위원은 향후 삼성전자의 전망을 묻는 질문에 “삼성전자가 개별 이슈로 등락을 보이는 것을 본 적이 없다”며, “삼성전자는 하나의 종목이 아닌 코스피 그 자체로 봐야 한다”고 강조했다.

김 연구위원은 “중동 이슈와 이에 따른 강달러, 여기에 파월 의장의 발언까지 코스피 전체가 영향을 받고 있다”며, “환율이 높다(원화 약세)는 것은 수출 의존도가 높은 한국 입장에서는 좋은 일이지만 외국인 입장에선 (주식)매도 욕구를 불러일으키는 일이어서 당분간 이러한 기조가 이어질 것”이라고 설명했다.

실제 외국인들은 지난 3월 19일부터 4월 12일까지 쉼없이 삼성전자 순매수를 기록하다 15일 724억원 순매도를 보였고, 삼성전자가 급락해 8만원까지 밀린 16일 1070억원 순매수하며 다시 방향을 트는 듯 했으나, 17일 1451억원 순매도하며 주가 하락을 주도했다.

하지만 이번 주가 조정을 투자 기회로 삼아야 한다는 목소리도 나온다.

한국투자증권 채민숙 연구원은 17일 ‘소나기는 그치기 마련’이라는 제목의 보고서를 통해 “반도체는 내수 대비 수출 비중이 절대적으로 높고 본사와 해외법인, 고객간 거래 시 모두 달러로 결제하기 때문에 환율 상승은 메모리 반도체 기업 실적에는 긍정적”이라며, “메모리반도체 특성 상 매출원가에서 고정비가 가장 큰 부분을 차지하고 (수입을 하는) 원재료비 비중은 상대적으로 낮기 때문에 환율 상승에 따른 재료비 증가분 이상으로 매출액과 영업이익이 늘어난다”고 설명했다.

이어 “일반적으로 원/달러 환율 상승기에 삼성전자와 SK하이닉스 주가는 약세였다”면서도 “환율 하나만으로 주가 방향성에 대한 판단을 내리기 어렵고, 1분기 호실적 발표와 2분기 이후 우호적인 업황 코멘트가 예상되는 실적 발표 시즌을 앞두고 있는 시점에서 환율이 단기적으로 오버슈팅한 것이라면 매크로 불확실성이 걷힐 때 섹터 내 긍정적 요인들이 더욱 부각될 수 있을 것”이라고 전망했다.

출처 : 스트레이트뉴스(https://www.straightnews.co.kr)
'''])
print(logits)

if np.argmax(logits) == 1 :
    print("악재")
elif np.argmax(logits) == 0 :
    print("호재")

In [None]:
# 시각화
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report

# 모델을 평가 모드로 전환
model.eval()

# 실제 레이블과 예측 레이블을 저장할 리스트
true_labels = []
pred_labels = []

# 테스트 데이터셋에 대해 예측 수행
for batch in test_dataloader:
    batch = tuple(t.to(device) for t in batch)
    b_input_ids, b_input_mask, b_labels = batch

    with torch.no_grad():
        outputs = model(b_input_ids, attention_mask=b_input_mask)
    
    logits = outputs.logits
    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()

    # 예측된 레이블 저장
    pred_flat = np.argmax(logits, axis=1).flatten()
    labels_flat = label_ids.flatten()

    true_labels.extend(labels_flat)
    pred_labels.extend(pred_flat)

# 혼동 행렬 생성
conf_matrix = confusion_matrix(true_labels, pred_labels)

# 혼동 행렬 출력 및 시각화
print("Confusion Matrix:")
print(conf_matrix)

# 시각화
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=['0', '1'], yticklabels=['0', '1'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# 정밀도, 재현율, F1 점수를 포함한 분류 보고서 출력
report = classification_report(true_labels, pred_labels, target_names=['0', '1'])
print("Classification Report:")
print(report)