# グラフ抽出 (Graph Extraction)

収集したテキストデータから手続きの依存関係グラフを抽出する。
LLMを使用してテキストを解析し、JSON形式のグラフデータに変換する。

In [1]:
import os
import json
import glob
import asyncio
from typing import List, Optional
from enum import Enum
from pydantic import BaseModel, Field

from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
import config

# 設定
PROMPT_FILE = "../prompt.md"

# LLM設定
# MODEL_NAME = "gemma3:12b"
MODEL_NAME = "qwen3-vl:4b"
# BASE_URL = "http://localhost:11434"
BASE_URL = "http://192.168.0.59:11434"

GEMINI_MODEL = "gemini-2.5-pro"



In [2]:
class ActionType(str, Enum):
    Physical_Go = "Physical_Go"
    Physical_Mail = "Physical_Mail"
    Physical_Copy = "Physical_Copy"
    Physical_Print = "Physical_Print"
    Physical_Fill = "Physical_Fill"
    Physical_Attach = "Physical_Attach"
    External_Acquire = "External_Acquire"
    Digital_Input = "Digital_Input"
    Digital_Auth = "Digital_Auth"
    Digital_Upload = "Digital_Upload"
    Digital_Capture = "Digital_Capture"
    Digital_Submit = "Digital_Submit"
    Wait_Process = "Wait_Process"
    No_Action = "No_Action"

class ActionCategory(str, Enum):
    Work = "Work"
    Move = "Move"
    Wait = "Wait"

class GraphEdge(BaseModel):
    source: str = Field(description="The starting point or prerequisite of the action")
    target: str = Field(description="The result or next step of the action")
    action: str = Field(description="The specific action taken")
    type: ActionType = Field(description="The type of cost/action")
    category: ActionCategory = Field(description="The subject/category of the action")

class GraphData(BaseModel):
    analog: List[GraphEdge] = Field(description="Dependency graph for analog application")
    digital: List[GraphEdge] = Field(description="Dependency graph for digital application")

In [3]:
# Initialize LLM
model_local = ChatOllama(
    model=MODEL_NAME,
    base_url=BASE_URL,
    temperature=0.1
).with_structured_output(GraphData)

model_gemini = ChatGoogleGenerativeAI(
    model=GEMINI_MODEL,
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
).with_structured_output(
    schema=GraphData.model_json_schema(), method="json_schema"
)

In [4]:
def create_chain(prompt_path: str):
    with open(prompt_path, "r", encoding="utf-8") as f:
        prompt_text = f.read()
    
    # Create Prompt Template
    # prompt.md contains {input_homepage} and {input_digital}
    prompt = ChatPromptTemplate.from_template(prompt_text)
    
    # Create Chain
    # chain = prompt | model_local
    chain = prompt | model_gemini
    return chain

async def extract_graph(chain, homepage_text, digital_text):
    try:
        result = await chain.ainvoke({
            "input_homepage": homepage_text,
            "input_digital": digital_text
        })
        return result
    except Exception as e:
        print(f"Error during extraction: {e}")
        return None

In [5]:
chain = create_chain(PROMPT_FILE)

# Ensure prompts directory exists
PROMPTS_DIR = os.path.join(config.DATA_DIR, "prompts")
os.makedirs(PROMPTS_DIR, exist_ok=True)

for city in config.TARGET_CITIES:
    city_id = city["id"]
    city_name = city["name"]
    print(f"Processing {city_name} ({city_id})...")
    
    # ファイルの検索
    homepage_files = glob.glob(os.path.join(config.RAW_TEXT_DIR, f"{city_id}_homepage*.txt"))
    digital_files = glob.glob(os.path.join(config.RAW_TEXT_DIR, f"{city_id}_digital*.txt"))
    
    if not homepage_files or not digital_files:
        print(f"  -> Missing files for {city_name}. Skipping.")
        continue
        
    with open(homepage_files[0], "r", encoding="utf-8") as f:
        homepage_text = f.read()
    with open(digital_files[0], "r", encoding="utf-8") as f:
        digital_text = f.read()
    
    # Promptの生成と保存
    # ChatPromptTemplateからフォーマット済みメッセージを取得
    # chain.firstはChatPromptTemplate
    prompt_template = chain.first
    formatted_prompt_value = prompt_template.invoke({
        "input_homepage": homepage_text,
        "input_digital": digital_text
    })
    # 文字列として取得
    formatted_text = formatted_prompt_value.to_string()
    
    prompt_output_path = os.path.join(PROMPTS_DIR, f"prompt_{city_id}.md")
    with open(prompt_output_path, "w", encoding="utf-8") as f:
        f.write(formatted_text)
    print(f"  -> Saved prompt to {prompt_output_path}")
        
    # グラフ抽出
    graph_data = await extract_graph(chain, homepage_text, digital_text)
    
    if graph_data:
        output_path = os.path.join(config.PROC_GRAPH_DIR, f"{city_id}.json")
        print(graph_data)
        json_data = graph_data
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(json_data, f, indent=2, ensure_ascii=False)
        print(f"  -> Saved graph to {output_path}")
    else:
        print(f"  -> Failed to extract graph for {city_name}")

Processing 福岡市 (40130)...
  -> Saved prompt to /home/rorok/eeic_lectures/3a/superD/admin-proc/data/prompts/prompt_40130.md
{'analog': [{'source': '開始(自宅)', 'target': '区役所窓口', 'action': '区役所へ移動', 'type': 'Physical_Go', 'category': 'Move'}, {'source': '区役所窓口', 'target': '申請書(紙)', 'action': '窓口で申請書を入手', 'type': 'External_Acquire', 'category': 'Work'}, {'source': '申請書(紙)', 'target': '申請書(記入済)', 'action': '窓口で必要事項を記入', 'type': 'Physical_Fill', 'category': 'Work'}, {'source': '申請書(記入済)', 'target': '申請完了', 'action': 'マイナンバーカード・通帳を提示し提出', 'type': 'Physical_Attach', 'category': 'Work'}, {'source': '申請完了', 'target': '審査完了', 'action': '自治体による審査', 'type': 'Wait_Process', 'category': 'Wait'}, {'source': '審査完了', 'target': '手当受給', 'action': '認定通知書受領・口座振込', 'type': 'Wait_Process', 'category': 'Wait'}, {'source': '開始(自宅)', 'target': '所得証明書', 'action': '情報連携により省略', 'type': 'No_Action', 'category': 'Work'}, {'source': '開始(自宅)', 'target': '健康保険証の写し', 'action': '組合健保のため提出不要', 'type': 'No_Action', 'category