In [1]:
import os
from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeDocumentRequest
from azure.core.credentials import AzureKeyCredential
from datetime import datetime, timedelta
from dotenv import load_dotenv
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
import re
from glob import glob

# 🔑 .env 로드
load_dotenv()
embedding_api_key = os.getenv("Embedding_API_KEY")
embedding_endpoint = os.getenv("Embedding_ENDPOINT")
gpt_api_key = os.getenv('OPENAI_API_KEY')
gpt_endpoint = os.getenv('OPENAI_ENDPOINT')
BLOB_CONN_STR = os.getenv('BLOB_CONN_STR')
DI_ENDPOINT = os.getenv('DI_ENDPOINT')
DI_API_KEY = os.getenv('DI_API_KEY')

# 📁 경로 설정
BLOB_CONTAINER_NAME = "pdf-container"
PDF_FOLDER = r"E:\work\MS_project_2\code\테이블처리o\data\pdfs/d"
MD_FOLDER = r"E:\work\MS_project_2\code\테이블처리o\data\markdowns"
os.makedirs(MD_FOLDER, exist_ok=True)


In [None]:

# ✅ 클라이언트 초기화
blob_service = BlobServiceClient.from_connection_string(BLOB_CONN_STR)
container_client = blob_service.get_container_client(BLOB_CONTAINER_NAME)
if not container_client.exists():
    container_client.create_container()

di_client = DocumentIntelligenceClient(endpoint=DI_ENDPOINT, credential=AzureKeyCredential(DI_API_KEY))


def upload_pdf_to_blob(pdf_path: str, blob_name: str) -> str:
    """PDF를 Blob에 업로드하고 SAS URL 반환"""
    blob_client = container_client.get_blob_client(blob_name)
    with open(pdf_path, "rb") as f:
        blob_client.upload_blob(f, overwrite=True)

    sas_token = generate_blob_sas(
        account_name=blob_service.account_name,
        container_name=BLOB_CONTAINER_NAME,
        blob_name=blob_name,
        account_key=blob_service.credential.account_key,
        permission=BlobSasPermissions(read=True),
        expiry=datetime.utcnow() + timedelta(minutes=15)
    )

    return f"{blob_client.url}?{sas_token}"


def analyze_pdf_to_markdown(sas_url: str) -> str:
    """Document Intelligence를 사용해 Markdown으로 변환"""
    poller = di_client.begin_analyze_document(
        model_id="prebuilt-layout",
        body=AnalyzeDocumentRequest(url_source=sas_url),
        output_content_format='markdown'
    )
    result = poller.result()
    return result.content


def request_gpt(prompt: str) -> str:
    headers = {
        'Content-Type': 'application/json',
        'api-key': gpt_api_key
    }
    body = {
        "messages": [
            {
                "role": "system",
                "content": (
                    '너는 HTML 테이블을 사람이 이해할 수 있도록 자연어 문장으로 변환해.  '
                    '항목과 값을 "구분: 내용" 식으로 나누지 말고, 원래 테이블에서 쓰인 항목명을 그대로 key로 사용해.  '
                    '예: "임대조건 : 내용", "임대보증금-월임대료 전환 : 내용"처럼.  '
                    '항목이 반복되는 경우에는 구분자를 붙여서 명확하게 구분해줘.  '
                    '불필요한 요약이나 도입부 없이 표의 핵심 내용만 변환해줘.'
                )
            },
            {
                "role": "user",
                "content": prompt
            }
        ],
        "temperature": 0.7,
        "top_p": 0.95,
        "max_tokens": 800
    }

    response = requests.post(gpt_endpoint, headers=headers, json=body)
    if response.status_code == 200:
        return response.json()['choices'][0]['message']['content']
    else:
        print("❌ 요청 실패:", response.status_code, response.text)
        return "⚠️ 오류가 발생했습니다."


def convert_md_tables_with_llm_parallel(md_text: str, max_workers=5) -> str:
    soup = BeautifulSoup(md_text, 'html.parser')
    tables = soup.find_all('table')
    table_strs = [str(table) for table in tables]
    unique_tables = list(set(table_strs))
    table_to_text = {}

    def process_table(table_html):
        prompt = (
            f"다음 HTML 테이블의 내용을 자연어 문장으로 간결하게 변환해줘.\n\n{table_html}"
        )
        result = request_gpt(prompt)
        return table_html, result

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_table, tbl) for tbl in unique_tables]
        for future in as_completed(futures):
            tbl_html, gpt_result = future.result()
            table_to_text[tbl_html] = gpt_result

    for original_table in table_strs:
        if original_table in table_to_text:
            md_text = md_text.replace(original_table, table_to_text[original_table])

    return md_text


def preprocess_markdown_headers(md_text: str) -> str:
    md_text = re.sub(r'^(#{1,6}\s*■?\s*[^:\n]+):\s*(.+)$', r'\1\n\2', md_text, flags=re.MULTILINE)
    md_text = re.sub(r'^(■\s*\([^)]+\))\s+(.+)$', r'\1\n\2', md_text, flags=re.MULTILINE)
    return md_text




In [6]:
# ✅ 전체 처리 루프
pdf_files = glob(os.path.join(PDF_FOLDER, "*.pdf"))
print(f"🔍 처리할 PDF 파일 수: {len(pdf_files)}")

for pdf_path in pdf_files:
    filename = os.path.splitext(os.path.basename(pdf_path))[0]
    blob_name = f"{filename}.pdf"
    md_path = os.path.join(MD_FOLDER, f"{filename}.md")

    print(f"\n📄 처리 중: {filename}")

    # 1. 업로드 및 SAS URL 생성
    sas_url = upload_pdf_to_blob(pdf_path, blob_name)
    print("✅ Blob 업로드 및 SAS URL 완료")

    # 2. Markdown 변환
    md_content = analyze_pdf_to_markdown(sas_url)
    print("✅ Document Intelligence 분석 완료")

    # 3. GPT 테이블 변환
    md_with_tables = convert_md_tables_with_llm_parallel(md_content)
    print("✅ GPT 테이블 변환 완료")

    # 4. 헤더 전처리
    final_md = preprocess_markdown_headers(md_with_tables)

    # 5. 저장
    with open(md_path, 'w', encoding='utf-8') as f:
        f.write(final_md)
    print(f"✅ 저장 완료: {md_path}")


🔍 처리할 PDF 파일 수: 2

📄 처리 중: 서울지역본부 청년 매입임대주택
✅ Blob 업로드 및 SAS URL 완료
✅ Document Intelligence 분석 완료
✅ GPT 테이블 변환 완료
✅ 저장 완료: E:\work\MS_project_2\code\테이블처리o\data\markdowns\서울지역본부 청년 매입임대주택.md

📄 처리 중: 아츠스테이영등포_입주자모집공고문
✅ Blob 업로드 및 SAS URL 완료
✅ Document Intelligence 분석 완료
✅ GPT 테이블 변환 완료
✅ 저장 완료: E:\work\MS_project_2\code\테이블처리o\data\markdowns\아츠스테이영등포_입주자모집공고문.md


In [None]:
import os
from glob import glob
from langchain.text_splitter import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain.schema import Document

# 📁 마크다운 폴더 경로 (raw string으로 경로 작성)
MARKDOWN_FOLDER = r"E:\work\MS_project_2\code\테이블처리o\data\markdowns"

# ✅ 분할 도구 정의
header_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=[
    ("#", "section"),
    ("##", "subsection"),
    ("###", "subsubsection"),
    ("■", "bullet"),
    ("※", "bullet"),
    ("▪", "subbullet"),
    ("✔", "check")
])

recursive_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50
)

# ✅ 전체 문서 리스트
all_docs = []

# ✅ 모든 .md 파일 가져오기
md_files = glob(os.path.join(MARKDOWN_FOLDER, "*.md"))
print(f"📄 총 Markdown 파일 수: {len(md_files)}")

# ✅ 각 파일 처리
for md_path in md_files:
    filename = os.path.splitext(os.path.basename(md_path))[0]  # 확장자 없는 파일명

    print(f"\n🚀 처리 중: {filename}")

    with open(md_path, "r", encoding="utf-8") as f:
        md_text = f.read()

    # 1. Markdown 헤더 기준 분할
    header_docs = header_splitter.split_text(md_text)

    # 2. 각 문서에 파일명 메타데이터 추가
    for doc in header_docs:
        doc.metadata["source"] = filename

    # 3. RecursiveCharacterTextSplitter로 chunk 분할
    for doc in header_docs:
        sub_docs = recursive_splitter.split_text(doc.page_content)

        for chunk in sub_docs:
            all_docs.append(
                Document(
                    page_content=chunk,
                    metadata=doc.metadata  # section, subsection, bullet, source 포함
                )
            )

print(f"\n✅ 전체 청크 수: {len(all_docs)}")


📄 총 Markdown 파일 수: 8

🚀 처리 중: (대전충남)25년1차청년매입임대_표준입주자모집공고문

🚀 처리 중: (정정공고문)25년1차청년매입임대_표준입주자모집공고문

🚀 처리 중: 2025년 1차 대구경북 청년매입임대 입주자 모집 공고문

🚀 처리 중: 2025년1차청년매입임대입주자모집공고문(광주전남)

🚀 처리 중: 25년 1차 청년매입임대 입주자 모집 공고문(강원지역본부)

🚀 처리 중: 25년1차청년매입임대입주자모집공고문

🚀 처리 중: 서울지역본부 청년 매입임대주택

🚀 처리 중: 아츠스테이영등포_입주자모집공고문

✅ 전체 청크 수: 835


In [3]:
print(all_docs[11].metadata)
print('=========================')
print(all_docs[11].page_content)

{'bullet': '공동거주형임을 인지하지 못하고 신청한 후 계약을 포기하는 사례가 빈번하게 발생하고 있습니다.', 'section': '2 입주자격 및 입주자 선정기준', 'subsection': '■ 입주순위', 'source': '(대전충남)25년1차청년매입임대_표준입주자모집공고문'}
2순위는 본인과 부모의 월평균 소득이 전년도 도시근로자 가구원수별 가구당 월평균소득의 100% 이하이며, 본인과 부모의 자산이 국민임대주택 자산기준을 충족하는 자입니다.  
3순위는 본인의 월평균 소득이 전년도 도시근로자 1인 가구 월평균소득 100% 이하이며, 행복주택(청년) 자산기준을 충족하는 자입니다.


In [4]:
import os
import uuid
from langchain_openai import AzureOpenAIEmbeddings
from langchain_community.vectorstores import AzureSearch
embedding_api_version = "2024-02-15-preview"
embedding_deployment = "text-embedding-3-small"
os.environ.pop("OPENAI_API_BASE", None)
os.environ.pop("BASE_URL", None)

embedding = AzureOpenAIEmbeddings(
    api_key=embedding_api_key,
    azure_endpoint=embedding_endpoint,
    model=embedding_deployment,
    openai_api_version=embedding_api_version
)

In [None]:

import os
import uuid
from tqdm import tqdm
from langchain.schema import Document
from langchain_openai import AzureOpenAIEmbeddings
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    SearchIndex, SimpleField, SearchField, SearchFieldDataType,
    VectorSearch, HnswAlgorithmConfiguration, VectorSearchAlgorithmKind,
    VectorSearchProfile
)

# ✅ 1. API 키 및 엔드포인트
#ai_search_index_name = "new_index"
ai_search_index_name = "add_new_index"

ai_search_endpoint = os.getenv('AI_Search_ENDPOINT')
ai_search_api_key = os.getenv('AI_Search_API_KEY')



# ✅ 2. 임베딩 설정
embedding_deployment = "text-embedding-3-small"
embedding_api_version = "2024-02-15-preview"

os.environ.pop("OPENAI_API_BASE", None)
os.environ.pop("BASE_URL", None)

embedding = AzureOpenAIEmbeddings(
    api_key=embedding_api_key,
    azure_endpoint=embedding_endpoint,
    model=embedding_deployment,
    openai_api_version=embedding_api_version
)

# ✅ 3. 인덱스 스키마 정의
embedding_dim = 1536

fields = [
    SimpleField(name="id", type=SearchFieldDataType.String, key=True),
    SearchField(name="content", type=SearchFieldDataType.String, searchable=True),
    SearchField(name="source", type=SearchFieldDataType.String, searchable=True, filterable=True),
    SearchField(name="section", type=SearchFieldDataType.String, searchable=True, filterable=True),
    SearchField(name="subsection", type=SearchFieldDataType.String, searchable=True, filterable=True),
    SearchField(name="subsubsection", type=SearchFieldDataType.String, searchable=True, filterable=True),
    SearchField(name="bullet", type=SearchFieldDataType.String, searchable=True, filterable=True),
    SearchField(name="subbullet", type=SearchFieldDataType.String, searchable=True, filterable=True),
    SearchField(name="check", type=SearchFieldDataType.String, searchable=True, filterable=True),
    SearchField(
        name="embedding",
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
        searchable=True,
        vector_search_dimensions=embedding_dim,
        vector_search_profile_name="default"
    )
]

vector_search = VectorSearch(
    profiles=[VectorSearchProfile(name="default", algorithm_configuration_name="my-algorithm")],
    algorithms=[HnswAlgorithmConfiguration(name="my-algorithm", kind=VectorSearchAlgorithmKind.HNSW)]
)

index = SearchIndex(
    name=ai_search_index_name,
    fields=fields,
    vector_search=vector_search
)

# ✅ 4. 인덱스 초기화 및 생성
index_client = SearchIndexClient(endpoint=ai_search_endpoint, credential=AzureKeyCredential(ai_search_api_key))

if ai_search_index_name in [i.name for i in index_client.list_indexes()]:
    index_client.delete_index(ai_search_index_name)
    print("🗑 기존 인덱스 삭제 완료")

index_client.create_index(index)
print("✅ Azure AI Search 인덱스 생성 완료")

# ✅ 5. 벡터 데이터 업로드
# 👉 여기서 all_docs는 이미 만들어진 Document 리스트라고 가정

# 예: all_docs = [Document(page_content=..., metadata={...}), ...]

search_client = SearchClient(endpoint=ai_search_endpoint, index_name=ai_search_index_name, credential=AzureKeyCredential(ai_search_api_key))

batch = []
BATCH_SIZE = 50

for doc in tqdm(all_docs, desc="📤 업로드 중"):
    vector = embedding.embed_query(doc.page_content)

    record = {
        "id": str(uuid.uuid4()),
        "content": doc.page_content,
        "embedding": vector,
        "source": doc.metadata.get("source", ""),
        "section": doc.metadata.get("section", ""),
        "subsection": doc.metadata.get("subsection", ""),
        "subsubsection": doc.metadata.get("subsubsection", ""),
        "bullet": doc.metadata.get("bullet", ""),
        "subbullet": doc.metadata.get("subbullet", ""),
        "check": doc.metadata.get("check", "")
    }

    batch.append(record)

    if len(batch) >= BATCH_SIZE:
        search_client.upload_documents(documents=batch)
        batch = []

# 남은 데이터 업로드
if batch:
    search_client.upload_documents(documents=batch)

print("✅ 전체 문서 업로드 완료")


✅ Azure AI Search 인덱스 생성 완료


📤 업로드 중: 100%|██████████| 835/835 [04:58<00:00,  2.80it/s]


✅ 전체 문서 업로드 완료
