In [None]:
!pip install rdflib

## 検証

In [None]:
import os
import re
import pandas as pd
from rdflib import Graph
from rdflib.exceptions import ParserError
import logging

# --- 設定 ---
BASE_DIR = 'https://github.com/naoki-kokaze/British_Warship_Career/tree/main/'
TURTLE_DIR = os.path.join(BASE_DIR, 'api_out')  # 生成されたTTLファイルがあるディレクトリ
ONTOLOGY_PATH = os.path.join(BASE_DIR, 'scheme', 'warship_career_ontology.ttl') # オントロジーファイルのパス
LOG_FILE = os.path.join(BASE_DIR, 'log', 'validation.log') # 検証ログファイル
STRATA_DIR = os.path.join(BASE_DIR, 'strata')  # 原文Excelファイルがあるディレクトリ
excel_cache = {}  # ★★★ Excel読み込みを高速化するキャッシュ

# --- ロギング設定 ---

# 1. ログファイルの親ディレクトリを取得
log_dir = os.path.dirname(LOG_FILE)

# 2. 親ディレクトリが存在するかチェックし、なければ作成する
if log_dir and not os.path.exists(log_dir):
    try:
        os.makedirs(log_dir)
        print(f"Log directory not found. Created: {log_dir}")
    except Exception as e:
        print(f"FATAL: Could not create log directory {log_dir}: {e}")
        print("Validation cannot proceed without logging.")
        exit() # ディレクトリが作れなければ終了する

# filemode='a' (追記モード) に変更
logging.basicConfig(filename=LOG_FILE, level=logging.INFO, filemode='a',
                    format='%(asctime)s - %(levelname)s - %(message)s')
print(f"Validation log will be appended to: {LOG_FILE}")
logging.info(f"--- Validation Process Started ---") # ログに開始を記録

# --- 検証関数 ---

def validate_syntax(filepath):
    """(フェーズ1) Turtleファイルの構文を検証する"""
    g = Graph()
    try:
        g.parse(filepath, format="turtle")
        logging.info(f"[Syntax OK] {os.path.basename(filepath)}")
        return True
    except ParserError as e:
        logging.error(f"[Syntax Error] {os.path.basename(filepath)}: {e}")
        print(f"[Syntax Error] {os.path.basename(filepath)}: {e}")
        return False
    except Exception as e:
        logging.error(f"[File Error] {os.path.basename(filepath)}: {e}")
        print(f"[File Error] {os.path.basename(filepath)}: {e}")
        return False

def run_sparql_validation(graph, filename):
    """SPARQLクエリを使ってカスタムルールを検証する"""
    all_rules_passed = True
    validation_queries = {
        "Multiple Statuses per Presence": """
            PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
            PREFIX : <http://www.example.com/ontology/warship#>
            SELECT ?presence (COUNT(DISTINCT ?status) AS ?count)
            WHERE {
              ?presence a crm:E93_Presence ;
                        :has_operational_status ?status .
            } GROUP BY ?presence HAVING (COUNT(DISTINCT ?status) > 1)
            """,
        "Role and Type Conflict": """
            PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
            PREFIX : <http://www.example.com/ontology/warship#>
            SELECT ?presence
            WHERE {
              ?presence a crm:E93_Presence ;
                        :has_ship_role ?role ;
                        :has_ship_type ?type .
            }
            """,
        "Type Missing (No Role)": """
            PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
            PREFIX : <http://www.example.com/ontology/warship#>
            PREFIX myData: <http://www.example.com/myData/>
            SELECT ?presence
            WHERE {
              ?presence a crm:E93_Presence ;
                        :has_operational_status ?status .
              FILTER (?status IN (myData:InCommission, myData:InOrdinary, myData:UnderRepair))
              FILTER NOT EXISTS { ?presence :has_ship_role ?role }
              FILTER NOT EXISTS { ?presence :has_ship_type ?type }
            }
            """,
         "Type Unexpected (With Role)": """
            PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
            PREFIX : <http://www.example.com/ontology/warship#>
            PREFIX myData: <http://www.example.com/myData/>
            SELECT ?presence
            WHERE {
              { ?presence :has_operational_status myData:InNonCombatDuty . }
              UNION
              { ?presence :has_ship_role ?anyRole . }

              ?presence :has_ship_type ?type .
            }
            """
    }

    for rule_name, query in validation_queries.items():
        try:
            results = graph.query(query)
            if len(results) > 0:
                all_rules_passed = False
                msg = f"[Custom Rule Error] {filename}: Rule '{rule_name}' violated for:"
                logging.error(msg)
                print(msg)
                for row in results:
                    # Try to get the presence URI, handle potential attribute errors if query differs
                    try:
                       p_uri = row.presence
                       logging.error(f"  - {p_uri}")
                       print(f"  - {p_uri}")
                    except AttributeError:
                       logging.error(f"  - Query result row: {row}") # Log the full row if 'presence' isn't the variable
                       print(f"  - Query result row: {row}")
        except Exception as e:
             all_rules_passed = False
             msg = f"[SPARQL Error] {filename}: Query for rule '{rule_name}' failed: {e}"
             logging.error(msg)
             print(msg)

    if all_rules_passed:
        logging.info(f"[Rules OK] {filename}")
    return all_rules_passed

def check_missing_costs(graph, filename):
    """SPARQLクエリを使って :incurred_cost が欠落している可能性のあるイベントをチェック"""
    query_missing_cost = """
    PREFIX : <http://www.example.com/ontology/warship#>
    PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
    PREFIX sealit: <http://www.sealitproject.eu/ontology/>
    PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
    PREFIX owl: <http://www.w3.org/2002/07/owl#>

    SELECT ?event ?eventType ?eventLabel
    WHERE {
      VALUES ?baseEventType {
        crm:E12_Production
        sealit:ShipRepair # Includes subclasses via rdfs:subClassOf*
        :FittingForSea
        :ShipConversion  # Includes subclasses via rdfs:subClassOf*
      }
      ?eventType rdfs:subClassOf* ?baseEventType .
      ?event a ?eventType .
      OPTIONAL { ?event rdfs:label ?eventLabel . }
      FILTER NOT EXISTS {
        ?event :incurred_cost ?cost .
      }
    }
    ORDER BY ?event
    """
    try:
        results = graph.query(query_missing_cost)
        if len(results) > 0:
            msg = f"[Cost Check Warning] {filename}: Found potential missing :incurred_cost for the following events (Manual verification needed):"
            logging.warning(msg)
            print(msg)
            for row in results:
                event_type_short = str(row.eventType).split('/')[-1].split('#')[-1]
                log_line = f"  - Event: {row.event}, Type: {event_type_short}, Label: {row.eventLabel}"
                logging.warning(log_line)
                print(log_line)
            return False # Indicate potential issue found
        else:
            logging.info(f"[Cost Check OK] {filename}: No obvious missing :incurred_cost found.")
            return True # Indicate no obvious issue found
    except Exception as e:
        msg = f"[SPARQL Error] {filename}: SPARQL query for missing costs failed: {e}"
        logging.error(msg)
        print(msg)
        return False # Indicate error during check

def load_original_text(turtle_filepath):
    """
    (フェーズ3の準備)
    .ttl ファイルパスから対応する .xlsx ファイルを特定し、
    艦船IDに基づいて原文テキストを読み込む。
    """
    global excel_cache # Excelファイルのキャッシュをグローバルに使用

    try:
        filename = os.path.basename(turtle_filepath) # 例: "SL1_0016.ttl"

        # 1. 艦種コード (Strata) をファイル名から抽出 (例: "SL1")
        match = re.match(r'([^_]+)_', filename)
        if not match:
            logging.warning(f"  [Fact Check SKIPPED] Could not extract strata code from filename: {filename}")
            return None
        strata_code = match.group(1) # "SL1"

        # 2. 艦船ID (Ship ID) を構築 (例: "myData:SL1_0016")
        ship_id_base = os.path.splitext(filename)[0] # "SL1_0016"
        ship_id_uri = f"myData:{ship_id_base}" # "myData:SL1_0016"

        # 3. Excelファイルのパスを構築
        excel_filename = f"{strata_code}.xlsx" # "SL1.xlsx"
        excel_filepath = os.path.join(STRATA_DIR, excel_filename)

        # 4. Excelファイルを（キャッシュを利用しつつ）読み込む
        if excel_filepath not in excel_cache:
            if not os.path.exists(excel_filepath):
                logging.warning(f"  [Fact Check SKIPPED] Strata Excel file not found: {excel_filepath}")
                return None

            # Excelの読み込みは時間がかかるため、初回のみ実行
            print(f"  Loading strata file: {excel_filename}...")
            excel_cache[excel_filepath] = pd.read_excel(excel_filepath)
            logging.info(f"  Loaded and cached strata file: {excel_filepath}")

        df_strata = excel_cache[excel_filepath]

        # 5. DataFrameから該当IDの行を検索し、"text"カラムの値を返す
        #    'ID' カラムが 'myData:SL1_0016' と完全に一致する行を探す
        row = df_strata[df_strata['ID'] == ship_id_uri]

        if row.empty:
            logging.warning(f"  [Fact Check SKIPPED] ID '{ship_id_uri}' not found in 'ID' column of {excel_filename}")
            return None

        original_text = row.iloc[0]['text']

        if pd.isna(original_text):
            logging.warning(f"  [Fact Check SKIPPED] 'text' column is empty for ID '{ship_id_uri}' in {excel_filename}")
            return None

        return str(original_text)

    except ImportError:
        logging.error("  [Fact Check FATAL] `pandas` or `openpyxl` is not installed. Fact checking requires them. Run: pip install pandas openpyxl")
        print("  [Fact Check FATAL] `pandas` or `openpyxl` is not installed. Fact checking requires them. Run: pip install pandas openpyxl")
        return "STOP" # 特殊なエラーコードを返し、メインループを止める
    except Exception as e:
        logging.error(f"  [Fact Check ERROR] Could not read original text for {filename}: {e}")
        return None

def check_fact_consistency(merged_graph, filename, original_text):
    """
    (フェーズ3) 原文テキストとRDFの「事実」（費用、年）を比較し、
    ハルシネーションや見落としの可能性を警告する。
    """
    print("Checking fact consistency (Text vs RDF)...")

    # 1. 原文テキストから「事実」を抽出 (Regex)
    #    (£1,234 や 4,030 のような数値。£記号はあってもなくても良い)
    text_costs = set(re.findall(r'£?([\d,]+(?:\.\d{1,2})?)\b', original_text))
    # コンマを削除して正規化
    normalized_text_costs = {c.replace(',', '') for c in text_costs}

    # 年 (4桁の年号)
    text_years = set(re.findall(r'\b(1[7-9]\d{2})\b', original_text))

    # 2. RDFから「事実」を抽出 (SPARQL)
    query_rdf_facts = """
    SELECT (str(?costVal) as ?cost) (str(?dateVal) as ?date)
    WHERE {
      { ?event :incurred_cost [ crm:P90_has_value ?costVal ] . }
      UNION
      {
        ?event crm:P4_has_time-span ?ts .
        { ?ts crm:P79_beginning_is_qualified_by ?dateVal . }
        UNION
        { ?ts crm:P80_end_is_qualified_by ?dateVal . }
        UNION
        { ?ts crm:P82_at_some_time_within ?dateVal . }
      }
    }
    """
    rdf_costs = set()
    rdf_years = set()

    try:
        results = merged_graph.query(query_rdf_facts)
        for row in results:
            if row.cost:
                # '25352.0' -> '25352' のように正規化
                rdf_costs.add(str(row.cost).split('.')[0])
            if row.date:
                # '1836-01-21' -> '1836' のように年号だけ抽出
                year_match = re.search(r'\b(1[7-9]\d{2})\b', str(row.date))
                if year_match:
                    rdf_years.add(year_match.group(1))

    except Exception as e:
        logging.error(f"  [Fact Check ERROR] {filename}: SPARQL query for facts failed: {e}")
        return False # チェック失敗

    # 3. 突合と警告
    has_warnings = False

    # 警告A: RDFにある費用がテキストにない (捏造)
    # (テキスト側は "£4,030" と "4,030" の両方を許容する)
    hallucinated_costs = {cost for cost in rdf_costs if cost not in normalized_text_costs}
    if hallucinated_costs:
        has_warnings = True
        logging.warning(f"  [Fact Mismatch WARNING] {filename}: Costs found in RDF but NOT in TEXT (Possible Hallucination): {hallucinated_costs}")

    # 警告B: テキストにある費用がRDFにない (見落とし)
    # (RDF側は '4030'、テキスト側は '£4,030' や '4030' を想定)
    missing_costs = {cost for cost in normalized_text_costs if cost not in rdf_costs}
    if missing_costs:
        has_warnings = True
        logging.warning(f"  [Fact MMismatch WARNING] {filename}: Costs found in TEXT but NOT in RDF (Possible Omission): {missing_costs}")

    # 警告C: テキストにある年がRDFにない (イベント見落とし)
    missing_years = text_years - rdf_years
    if missing_years:
        has_warnings = True
        logging.warning(f"  [Fact Mismatch WARNING] {filename}: Years found in TEXT but NOT in RDF Events (Possible Event Omission): {missing_years}")

    if not has_warnings:
        logging.info(f"  [Fact Check OK] {filename}: Costs and Years seem consistent.")
        print("  [Fact Check OK] Costs and Years seem consistent.")
    else:
        print(f"  [Fact Mismatch WARNING] {filename}: Check logs for potential fact omissions or hallucinations.")

    return True # チェック自体は完了

# ★★★ メイン実行ブロック ★★★
# TURTLE_DIR (親) の配下にあるサブディレクトリをループ処理する

if __name__ == "__main__":

    print(f"--- Starting Validation Process ---")
    print(f"Base Directory: {TURTLE_DIR}")
    print(f"Strata Directory (for source text): {STRATA_DIR}")
    print(f"Ontology File: {ONTOLOGY_PATH}")
    print(f"Log File: {LOG_FILE}")
    logging.info(f"Base Directory: {TURTLE_DIR}")
    logging.info(f"Ontology File: {ONTOLOGY_PATH}")

    # オントロジーファイルを読み込む (ループの外で1回だけ)
    ontology_graph = Graph()
    try:
         ontology_graph.parse(ONTOLOGY_PATH, format="turtle")
         logging.info(f"Successfully loaded ontology: {ONTOLOGY_PATH}")
         print(f"Successfully loaded ontology: {ONTOLOGY_PATH}")
    except Exception as e:
         logging.error(f"FATAL: Error loading ontology file {ONTOLOGY_PATH}: {e}")
         print(f"FATAL: Error loading ontology file {ONTOLOGY_PATH}: {e}")
         print("Validation cannot proceed without the ontology.")
         logging.error("Validation cannot proceed without the ontology.")
         exit()

    # --- サブディレクトリの検出 ---
    if not os.path.isdir(TURTLE_DIR):
        print(f"ERROR: Base directory not found: {TURTLE_DIR}")
        logging.error(f"ERROR: Base directory not found: {TURTLE_DIR}")
        exit()

    try:
        sub_dirs = [d for d in os.listdir(TURTLE_DIR)
                    if os.path.isdir(os.path.join(TURTLE_DIR, d))]
    except Exception as e:
        print(f"ERROR: Could not list subdirectories in {TURTLE_DIR}: {e}")
        logging.error(f"ERROR: Could not list subdirectories in {TURTLE_DIR}: {e}")
        exit()

    if not sub_dirs:
        print(f"No subdirectories found in {TURTLE_DIR}. Nothing to validate.")
        logging.warning(f"No subdirectories found in {TURTLE_DIR}. Nothing to validate.")
        exit()

    print(f"Found {len(sub_dirs)} subdirectories to process: {sub_dirs}")
    logging.info(f"Found {len(sub_dirs)} subdirectories to process: {sub_dirs}")

    # --- 各サブディレクトリをループ処理 ---
    for sub_dir_name in sub_dirs:
        current_turtle_dir = os.path.join(TURTLE_DIR, sub_dir_name)

        print(f"\n\n=======================================================")
        print(f"--- Processing Directory: {current_turtle_dir} ---")
        print(f"=======================================================")
        logging.info(f"--- Processing Directory: {current_turtle_dir} ---")

        # ディレクトリごとに統計をリセット
        syntax_errors = 0
        rule_violations = 0 # 「要確認」フラグの立ったファイル数
        processed_files = 0
        syntax_ok_files = []
        all_files = []

        # 1. 構文検証フェーズ
        print(f"\n--- Phase 1: Syntax Validation (Directory: {sub_dir_name}) ---")
        try:
            all_files = [f for f in os.listdir(current_turtle_dir) if f.endswith('.ttl')]
        except Exception as e:
             print(f"ERROR: Could not read directory {current_turtle_dir}: {e}")
             logging.error(f"ERROR: Could not read directory {current_turtle_dir}: {e}")
             continue # 次のディレクトリへ

        if not all_files:
            print(f"No .ttl files found in {current_turtle_dir}")
            logging.warning(f"No .ttl files found in {current_turtle_dir}")
            # このディレクトリのサマリーを出力して次へ
        else:
            for filename in all_files:
                filepath = os.path.join(current_turtle_dir, filename)
                if validate_syntax(filepath):
                    syntax_ok_files.append(filename)
                else:
                    syntax_errors += 1

        print(f"Syntax Validation Complete for {sub_dir_name}. {len(syntax_ok_files)} files OK, {syntax_errors} files with errors.")
        logging.info(f"Syntax Validation Complete for {sub_dir_name}. {len(syntax_ok_files)} files OK, {syntax_errors} files with errors.")

        # 2. & 3. ルール / コスト / 事実突合チェックフェーズ
        print(f"\n--- Phase 2 & 3: Rules, Costs, and Fact Checks (Directory: {sub_dir_name}) ---")
        if not syntax_ok_files:
            print("No files passed syntax validation. Skipping further checks for this directory.")
            logging.warning("No files passed syntax validation. Skipping further checks for this directory.")
        else:
            for filename in syntax_ok_files:
                processed_files += 1
                filepath = os.path.join(current_turtle_dir, filename)
                g = Graph()

                try:
                    # --- 原文テキストの読み込み (フェーズ3準備) ---
                    original_text_content = load_original_text(filepath)
                    if original_text_content == "STOP":
                        print("Stopping validation due to missing libraries (pandas/openpyxl).")
                        logging.error("Stopping validation due to missing libraries (pandas/openpyxl).")
                        break # このサブディリループを中断

                    # --- グラフのマージ ---
                    g.parse(filepath, format="turtle")
                    merged_graph = g + ontology_graph # オントロジーと結合

                    print(f"\n--- Validating file: {filename} ({processed_files}/{len(syntax_ok_files)}) ---")

                    # --- (フェーズ2a) カスタムルール検証 ---
                    print("Checking custom rules...")
                    rules_ok = run_sparql_validation(merged_graph, filename)

                    # --- (フェーズ2b) 費用欠落チェック ---
                    print("Checking for potentially missing costs...")
                    costs_ok = check_missing_costs(merged_graph, filename)

                    # --- (フェーズ3) 事実突合チェック ---
                    facts_ok = False
                    if original_text_content:
                        facts_ok = check_fact_consistency(merged_graph, filename, original_text_content)
                    else:
                        print("  [Fact Check SKIPPED] No original text file found.")
                        facts_ok = True # テキストがない場合はスキップ（エラー扱いしない）

                    # 判定
                    if rules_ok and costs_ok and facts_ok:
                         logging.info(f"[Overall OK] {filename}")
                         print(f"[Overall OK] {filename}")
                    else:
                         rule_violations += 1
                         logging.warning(f"[Validation Issues] {filename} - Check logs for details.")
                         print(f"[Validation Issues] {filename} - Check logs for details.")

                except Exception as e:
                    rule_violations += 1
                    logging.error(f"[Processing Error] Could not process {filename} for validation: {e}")
                    print(f"[Processing Error] Could not process {filename} for validation: {e}")

            if original_text_content == "STOP":
                break # ライブラリエラーの場合、サブディレクトリのループも抜ける

        # --- ディレクトリごとの最終結果表示 ---
        print(f"\n--- Validation Summary for {sub_dir_name} ---")
        print(f"Total files found: {len(all_files)}")
        print(f"Files with Syntax Errors: {syntax_errors}")
        print(f"Files checked for Rules/Costs/Facts: {len(syntax_ok_files)}")
        print(f"Files with Issues (Warnings or Errors): {rule_violations}")
        logging.info(f"--- Validation Summary for {sub_dir_name} ---")
        logging.info(f"Total files found: {len(all_files)}")
        logging.info(f"Files with Syntax Errors: {syntax_errors}")
        logging.info(f"Files checked for Rules/Costs/Facts: {len(syntax_ok_files)}")
        logging.info(f"Files with Issues (Warnings or Errors): {rule_violations}")

    print("\n--- All Directories Processed ---")
    print(f"Log file: {LOG_FILE}")
    logging.info("--- All Directories Processed ---")