1. 엑셀 파일 처리: 엑셀 파일에서 데이터를 추출하고, JSON 파일로 저장합니다.
2. JSON 파일 검증: 변환된 JSON 파일에서 데이터를 샘플링하고, 필드 유효성을 확인합니다.
3. 데이터 전처리: 형태소 분석과 불용어 제거를 통해 데이터를 정제합니다.
4. 토큰화: 정제된 데이터를 BERT Tokenizer로 토큰화합니다.
5. 데이터셋 준비: 데이터셋을 정의하고, DataLoader로 데이터를 배치 단위로 나눕니다.
6. 데이터 확인: 배치된 데이터를 확인하고, 학습에 사용할 준비가 되었는지 검증합니다.

In [1]:
import os
import sys
import urllib.request
import pandas as pd
import requests

import warnings
# 경고 메시지 무시 설정
warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")

1. 엑셀 파일 읽기 및 JSON 변환

* 다음부터는 이거 실행안해도됨 1번만 하면됨

In [2]:
import pandas as pd
import json
import glob
import os

# 엑셀 파일들이 들어 있는 최상위 폴더 경로
base_dir = r"/mnt/c/study/KISTI_AI/023.국회 회의록 기반 지식검색 데이터/3.개방데이터/1.데이터/Training/01.원천데이터"

# 모든 하위 폴더 내 엑셀 파일 경로 검색
excel_files = glob.glob(os.path.join(base_dir, '**', '*.xlsx'), recursive=True)

qa_data = []  # JSON으로 저장할 데이터 리스트

# 모든 엑셀 파일 읽기
for file_path in excel_files:
    try:
        # 엑셀 파일 읽기
        df = pd.read_excel(file_path)

        question_info = None  # 현재 질문 정보를 저장할 변수

        # 각 행을 순회하며 질문(Q)과 답변(A) 추출
        for idx, row in df.iterrows():
            if row['질의응답'] == 'Q':  # 질문일 때
                question_info = {
                    "question": row['발언내용'],
                    "회의번호": row['회의번호'],
                    "질의응답번호": row['질의응답번호'],
                    "회의구분": row['회의구분'],
                    "위원회": row['위원회'],
                    "회의일자": row['회의일자'],
                    "질문자": row['의원ID'],
                    "질문자_ISNI": row['ISNI']
                }
            elif row['질의응답'] == 'A' and question_info is not None:  # 답변일 때
                answer_info = {
                    "answer": row['발언내용'],
                    "답변자": row['의원ID'],
                    "답변자_ISNI": row['ISNI']
                }

                # 유효성 검사: 질문과 답변이 있는지 확인
                if not question_info['question'] or not answer_info['answer']:
                    print(f"파일 {file_path}의 {idx}번째 항목에서 질문 또는 답변이 누락되었습니다. 건너뜁니다.")
                    continue

                # 질문과 답변을 연결하여 하나의 JSON 객체로 만듦
                qa_data.append({**question_info, **answer_info})
                question_info = None  # 사용 후 질문 정보를 초기화

    except FileNotFoundError:
        print(f"파일을 찾을 수 없습니다: {file_path}")
    except pd.errors.EmptyDataError:
        print(f"빈 파일이므로 건너뜁니다: {file_path}")
    except Exception as e:
        print(f"파일 처리 중 오류 발생 ({file_path}): {e}")

# 라벨링 데이터 파일 저장 경로
save_dir = r"/mnt/c/study/KISTI_AI/023.국회 회의록 기반 지식검색 데이터/3.개방데이터/1.데이터/Training/02.라벨링데이터"
save_path = os.path.join(save_dir, '라벨링데이터.json')

# 디렉토리가 존재하지 않으면 생성
os.makedirs(save_dir, exist_ok=True)

# JSON 파일로 저장
try:
    with open(save_path, 'w', encoding='utf-8') as f:
        json.dump(qa_data, f, ensure_ascii=False, indent=4)
    print(f"모든 파일 처리가 완료되었습니다. JSON 파일이 저장되었습니다: {save_path}")
except Exception as e:
    print(f"JSON 파일 저장 중 오류 발생: {e}")


모든 파일 처리가 완료되었습니다. JSON 파일이 저장되었습니다: /mnt/c/study/KISTI_AI/023.국회 회의록 기반 지식검색 데이터/3.개방데이터/1.데이터/Training/02.라벨링데이터/라벨링데이터.json


2. 변환된 JSON 파일 검증

In [4]:
import json
import os

# 라벨링된 JSON 파일 경로
file_path = "/mnt/c/study/KISTI_AI/023.국회 회의록 기반 지식검색 데이터/3.개방데이터/1.데이터/Training/02.라벨링데이터/라벨링데이터.json"

# JSON 파일 로드 및 예외 처리
try:
    # 파일 존재 여부 확인
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"파일을 찾을 수 없습니다. 경로를 확인해주세요: {file_path}")

    # JSON 파일 읽기
    with open(file_path, 'r', encoding='utf-8') as f:
        qa_data = json.load(f)
    print(f"JSON 파일이 정상적으로 로드되었습니다. 총 {len(qa_data)}개의 질문-답변 쌍이 있습니다.")

except FileNotFoundError:
    print(f"파일을 찾을 수 없습니다. 경로를 확인해주세요: {file_path}")
except json.JSONDecodeError:
    print("JSON 파일을 읽는 중 오류가 발생했습니다. 파일 형식이 올바른지 확인해주세요.")
except Exception as e:
    print(f"오류 발생: {e}")


데이터 샘플 확인 (처음 5개 질문-답변 쌍):
샘플 1:
{
    "question": "우리 사장이나 간부들이 이것을 대단히 소홀하게 중요하지 않게 생각하는데 끝까지 내 이야기에 대한 답변이 나오지 않았습니다.  공역문제인데 아까 공역을 전부 예를 들었습니다마는 가령 미공군 매향리 사격장, 한ㆍ미합동훈련장 또 여주 공군사격장, 수도권 비행금지구역, 휴전선 비행금지구역 이렇게 공역이 되어 있는데 앞으로 허브공항으로서 많은 비행기가 오는 곳에 공역이 도처에 있어요. 과연 그 공역을 피해 갈 수 있을 것인가에 대한 어려움을 우리 건교부하고 미8군하고 국방부하고 협의해서 공역문제는 사전에 합의를 보아야 할 문제인데 이것에 대해서 사장은 어떤 생각을 가지고 있느냐고 물어 보았습니다. 답변을 해주십시오.",
    "회의번호": 30045,
    "질의응답번호": 1,
    "회의구분": "국정감사",
    "위원회": "건설교통위원회",
    "회의일자": "2000年10月19日(木)",
    "질문자": "3337",
    "질문자_ISNI": "0000000463657210",
    "answer": "서면으로 답변을 드리려고 했습니다마는 공역문제는 너무 중요한 문제입니다. 그리고 당장은 아니지만 2, 3년 앞을 내다보면 매우 심각한 문제입니다. 미국 정부에서도 그 심각성을 인정하고 있습니다. 그래서 제가 알기로는 건교부에서 국방부, 미 당국과도 협의를 이미 진행하고 있습니다.   그래서 공역을 변경하기로 일부 합의되어서 저희 공항개항과 더불어 현재의 공역보다도 많이 여유를 갖게 되어 있습니다. 그리고 근본적인 문제는 앞으로 한ㆍ미간 또는 국내에서 국방부와 협의가 계속 진행될 것으로 알고 있습니다.",
    "답변자": " ",
    "답변자_ISNI": " "
}
--------------------------------------------------
샘플 2:
{
    "question": "제가 아까 한 질의는 아주 근본적인 질의거든요

3. 데이터 전처리: 형태소 분석과 불용어 제거

In [3]:
import re
from konlpy.tag import Mecab

# Mecab 형태소 분석기 로드
mecab = Mecab()

# 불용어 리스트 정의
stop_words = [
    "것", "있다", "하다", "입니다", "그리고", "하지만", "또한", "그런데", "저는", "우리는", "그래서", "이것", "저것", "그것",
    "다시", "모든", "각각", "모두", "어느", "몇몇", "이런", "저런", "그런", "어떤", "특히", "즉", "또", "이후", "때문에", "통해서",
    "같은", "많은", "따라서", "등", "경우", "관련", "대해", "의해", "이기", "대한", "그리고", "라고", "이라는", "에서", "부터", "까지",
    "와", "과", "으로", "에", "의", "를", "가", "도", "로", "에게", "만", "뿐", "듯", "제", "내", "저", "그", "할", "수", "있", "같",
    "되", "보다", "아니", "아닌", "이", "있어서", "입니다", "있습니다", "합니다", "입니까", "같습니다", "아닙니다", "라는", "그러므로", 
    "입니다만", "때문입니다", "라고요", "그러하다", "하고", "이와"
]

# 긴 텍스트를 슬라이싱하는 함수 정의 (최소 길이 조건 추가)
def slice_long_text(text, max_length=512, overlap=50, min_length=50):
    """
    긴 텍스트를 슬라이싱하여 나눔
    :param text: 원본 텍스트
    :param max_length: 최대 토큰 길이
    :param overlap: 슬라이싱할 때 겹치는 토큰 수
    :param min_length: 슬라이싱된 텍스트의 최소 길이
    :return: 슬라이싱된 텍스트 리스트
    """
    tokens = mecab.morphs(text)  # 형태소 분석을 통한 토큰화
    
    sliced_texts = []
    start_idx = 0

    # 토큰의 길이가 최대 길이를 초과하면 슬라이싱
    while start_idx < len(tokens):
        end_idx = min(start_idx + max_length, len(tokens))
        slice = tokens[start_idx:end_idx]
        
        # 최소 길이보다 짧은 슬라이스는 생략
        if len(slice) >= min_length:
            sliced_texts.append(slice)
        
        start_idx = end_idx - overlap  # overlap 만큼 겹치게 슬라이싱

    # 슬라이싱된 텍스트 리스트 반환
    return [" ".join(slice) for slice in sliced_texts]

# 데이터 전처리 함수 정의
def preprocess_text_with_slicing(text):
    """
    긴 텍스트를 슬라이싱하여 각 슬라이스에 대해 전처리 수행
    :param text: 원본 텍스트
    :return: 전처리된 텍스트
    """
    # 1. 긴 텍스트를 슬라이싱
    sliced_texts = slice_long_text(text)

    # 2. 슬라이싱된 각 텍스트를 전처리
    preprocessed_slices = []
    for slice in sliced_texts:
        # 특수 문자 및 공백 제거
        slice = re.sub(r"[^가-힣a-zA-Z0-9\s]", "", slice)
        slice = re.sub(r"\s+", " ", slice).strip()

        # 형태소 분석 및 불용어 제거
        tokens = mecab.morphs(slice)
        filtered_tokens = [word for word in tokens if word not in stop_words]
        
        # 정제된 단어들로 다시 결합하여 저장
        preprocessed_slices.append(" ".join(filtered_tokens))

    # 3. 모든 슬라이스를 합쳐서 반환
    return " ".join(preprocessed_slices)


개선된 코드?

In [None]:
import re
from konlpy.tag import Mecab
import time

# Mecab 형태소 분석기 로드
mecab = Mecab()

# 불용어 리스트 정의
stop_words = set([
    "것", "있다", "하다", "입니다", "그리고", "하지만", "또한", "그런데", "저는", "우리는", "그래서", "이것", "저것", "그것",
    "다시", "모든", "각각", "모두", "어느", "몇몇", "이런", "저런", "그런", "어떤", "특히", "즉", "또", "이후", "때문에", "통해서",
    "같은", "많은", "따라서", "등", "경우", "관련", "대해", "의해", "이기", "대한", "그리고", "라고", "이라는", "에서", "부터", "까지",
    "와", "과", "으로", "에", "의", "를", "가", "도", "로", "에게", "만", "뿐", "듯", "제", "내", "저", "그", "할", "수", "있", "같",
    "되", "보다", "아니", "아닌", "이", "있어서", "입니다", "있습니다", "합니다", "입니까", "같습니다", "아닙니다", "라는", "그러므로", 
    "입니다만", "때문입니다", "라고요", "그러하다", "하고", "이와"
])

# 긴 텍스트를 슬라이싱하는 함수 정의 (최소 길이 조건 추가)
def slice_long_text(text, max_length=512, overlap=50, min_length=50):
    """
    긴 텍스트를 슬라이싱하여 나눔
    :param text: 원본 텍스트
    :param max_length: 최대 토큰 길이
    :param overlap: 슬라이싱할 때 겹치는 토큰 수
    :param min_length: 슬라이싱된 텍스트의 최소 길이
    :return: 슬라이싱된 텍스트 리스트
    """
    tokens = mecab.morphs(text)  # 형태소 분석을 통한 토큰화
    
    sliced_texts = []
    start_idx = 0

    # 토큰의 길이가 최대 길이를 초과하면 슬라이싱
    while start_idx < len(tokens):
        end_idx = min(start_idx + max_length, len(tokens))
        slice = tokens[start_idx:end_idx]
        
        # 최소 길이보다 짧은 슬라이스는 생략
        if len(slice) >= min_length:
            sliced_texts.append(slice)
        
        start_idx = end_idx - overlap  # overlap 만큼 겹치게 슬라이싱

    # 슬라이싱된 텍스트 리스트 반환
    return [" ".join(slice) for slice in sliced_texts]

# 데이터 전처리 함수 정의
def preprocess_text_with_slicing(text):
    """
    긴 텍스트를 슬라이싱하여 각 슬라이스에 대해 전처리 수행
    :param text: 원본 텍스트
    :return: 전처리된 텍스트
    """
    # 1. 긴 텍스트를 슬라이싱
    sliced_texts = slice_long_text(text)

    # 2. 슬라이싱된 각 텍스트를 전처리
    preprocessed_slices = []
    for slice in sliced_texts:
        # 특수 문자 및 공백 제거
        slice = re.sub(r"[^가-힣a-zA-Z0-9\s]", "", slice)
        slice = re.sub(r"\s+", " ", slice).strip()

        # 형태소 분석 및 불용어 제거
        filtered_tokens = [word for word in slice.split() if word not in stop_words]
        
        # 정제된 단어들로 다시 결합하여 저장
        preprocessed_slices.append(" ".join(filtered_tokens))

    # 3. 모든 슬라이스를 합쳐서 반환
    return " ".join(preprocessed_slices)

# 샘플 데이터로 시간 측정
sample_data = [
    "저는 오늘 국회 회의에 참석하여 중요한 발표를 했습니다.",
    "정부 정책에 대한 국민들의 반응이 매우 긍정적입니다.",
    "이 법안이 통과될 경우, 앞으로의 경제 상황이 나아질 것입니다."
]

# 전처리 전과 후의 시간 측정
print("전처리 전과 후 비교:\n")
for i, sentence in enumerate(sample_data):
    start_time = time.time()  # 시작 시간
    preprocessed = preprocess_text_with_slicing(sentence)
    end_time = time.time()    # 끝난 시간
    print(f"원본: {sentence}")
    print(f"전처리 후: {preprocessed}")
    print(f"소요 시간: {end_time - start_time:.4f} 초")
    print("-" * 50)


In [None]:
import time

# 전처리 시간 확인용 샘플 데이터
sample_data = [
    "저는 오늘 국회 회의에 참석하여 중요한 발표를 했습니다.",
    "정부 정책에 대한 국민들의 반응이 매우 긍정적입니다.",
    "이 법안이 통과될 경우, 앞으로의 경제 상황이 나아질 것입니다."
]

# 전처리 전과 후의 시간 측정
print("전처리 전과 후 비교:\n")
for i, sentence in enumerate(sample_data):
    start_time = time.time()  # 시작 시간
    preprocessed = preprocess_text_with_slicing(sentence)
    end_time = time.time()    # 끝난 시간
    print(f"원본: {sentence}")
    print(f"전처리 후: {preprocessed}")
    print(f"소요 시간: {end_time - start_time} 초")
    print("-" * 50)


전처리 전과 후 비교:



4. BERT Tokenizer를 사용한 토큰화

In [None]:
from transformers import BertTokenizer

# BERT Tokenizer 로드 (한국어 지원되는 BERT 모델)
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')

# 긴 텍스트를 슬라이싱하는 함수 정의
def slice_long_text(text, max_length=512, overlap=50):
    """
    긴 텍스트를 슬라이싱하여 나눔
    :param text: 원본 텍스트
    :param max_length: 최대 토큰 길이
    :param overlap: 슬라이싱할 때 겹치는 토큰 수
    :return: 슬라이싱된 텍스트 리스트
    """
    tokens = mecab.morphs(text)  # 형태소 분석을 통한 토큰화
    
    sliced_texts = []
    start_idx = 0

    # 토큰의 길이가 최대 길이를 초과하면 슬라이싱
    while start_idx < len(tokens):
        end_idx = min(start_idx + max_length, len(tokens))
        sliced_texts.append(tokens[start_idx:end_idx])
        start_idx = end_idx - overlap  # overlap 만큼 겹치게 슬라이싱

    # 슬라이싱된 텍스트 리스트 반환
    return [" ".join(slice) for slice in sliced_texts]

# 토큰화 함수 정의 (긴 텍스트 슬라이싱을 포함)
def tokenize_text(question, answer):
    """
    질문과 답변을 토큰화하며 긴 텍스트 처리
    :param question: 질문 텍스트
    :param answer: 답변 텍스트
    :return: 토큰화된 input_ids와 attention_mask
    """
    # 1. 질문과 답변을 각각 슬라이싱
    sliced_questions = slice_long_text(question)
    sliced_answers = slice_long_text(answer)

    input_ids_list = []
    attention_mask_list = []

    # 슬라이싱된 텍스트 개수 중 최소값으로 맞추기
    num_slices = min(len(sliced_questions), len(sliced_answers))

    # 2. 슬라이싱된 각 질문과 답변에 대해 토큰화
    for i in range(num_slices):
        q_slice = sliced_questions[i]
        a_slice = sliced_answers[i]
        
        inputs = tokenizer(
            q_slice,
            a_slice,
            max_length=512,
            padding='max_length',
            truncation=True,
            return_tensors='pt'  # PyTorch 텐서 형식으로 반환
        )

        # 각 슬라이스의 토큰 ID 및 패딩 마스크 추가
        input_ids_list.append(inputs['input_ids'].squeeze())  # 문장의 토큰 ID
        attention_mask_list.append(inputs['attention_mask'].squeeze())  # 문장의 패딩 여부 (0 또는 1)

    # 3. 슬라이스된 결과를 리스트로 반환
    return input_ids_list, attention_mask_list


5. 데이터셋 준비: 데이터셋을 정의하고, DataLoader로 데이터를 배치 단위로 나눕니다.

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

# 데이터셋 클래스 정의
class QADataset(Dataset):
    def __init__(self, data):
        self.data = data  # 데이터는 전처리된 JSON 데이터로 가정

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]

        # 질문과 답변 전처리
        question = preprocess_text(item['question'])
        answer = preprocess_text(item['answer'])

        # 질문과 답변 토큰화 (긴 텍스트 슬라이싱 포함)
        input_ids_list, attention_mask_list = tokenize_text(question, answer)

        # 여러 슬라이스가 있을 경우, 리스트로 반환
        return {
            'input_ids': input_ids_list,
            'attention_mask': attention_mask_list
        }

# 데이터셋 생성
dataset = QADataset(data)

# DataLoader로 데이터를 배치 단위로 준비
# 여기서는 collate_fn을 사용하여 슬라이스된 텍스트들을 처리할 수 있게 설정
def collate_fn(batch):
    input_ids_batch = []
    attention_mask_batch = []

    # 배치의 각 샘플을 순회
    for sample in batch:
        input_ids_batch.extend(sample['input_ids'])  # 각 슬라이스의 input_ids 추가
        attention_mask_batch.extend(sample['attention_mask'])  # 각 슬라이스의 attention_mask 추가

    # 텐서 형태로 변환하고 패딩 처리 (배치 내에서 가장 긴 슬라이스에 맞춤)
    input_ids_batch = pad_sequence(input_ids_batch, batch_first=True, padding_value=0)  # 패딩값은 0
    attention_mask_batch = pad_sequence(attention_mask_batch, batch_first=True, padding_value=0)

    return {
        'input_ids': input_ids_batch,
        'attention_mask': attention_mask_batch
    }

# DataLoader 정의
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)


6. 데이터 확인: 배치된 데이터를 확인하고, 학습에 사용할 준비가 되었는지 검증합니다.

In [None]:
# 데이터셋에서 샘플 확인
for batch in dataloader:
    print("Input IDs (첫 번째 배치):", batch['input_ids'])
    print("Attention Mask (첫 번째 배치):", batch['attention_mask'])
    break  # 첫 번째 배치만 확인
