<a href="https://colab.research.google.com/github/shiomiayari/shiomiayari.github.io/blob/main/Donut%E6%A7%8B%E7%AF%89.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Donutカレンダー抽出システム セットアップガイド

## 概要

このシステムは、Donut（Document Understanding Transformer）を使用して、カレンダー画像から自動でスケジュール情報を抽出し、Googleカレンダーに登録するアプリケーションです。

## 特徴

- **OCR-free**: 従来のOCR + テキスト解析ではなく、End-to-Endで画像から構造化データを抽出
- **ファインチューニング対応**: カスタムデータセットでの精度向上が可能
- **手書き対応**: 適切なデータセットでファインチューニングすれば手書きテキストも認識可能
- **Googleカレンダー連携**: 抽出したイベントを自動でカレンダーに登録

## 1. 環境構築

### 必要なライブラリのインストール

In [None]:
# 基本パッケージ
pip install torch torchvision torchaudio
pip install transformers datasets accelerate
pip install pillow opencv-python

# Googleカレンダー連携用
pip install google-auth google-auth-oauthlib google-auth-httplib2
pip install google-api-python-client

# 比較・検証用（オプション）
pip install pytesseract

GPU環境（推奨）
CUDA対応のPyTorchを使用することを強く推奨します：

In [None]:
# CUDA 11.8の場合
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

**2. Googleカレンダー API設定**


Google Cloud Consoleでの設定

1.Google Cloud Consoleにアクセス

2.新しいプロジェクトを作成または既存のプロジェクトを選択

3.Google Calendar APIを有効化

4.認証情報（OAuth 2.0クライアントID）を作成

5.credentials.jsonファイルをダウンロード

認証情報の設定

In [None]:
# credentials.jsonをプロジェクトルートに配置
# 初回実行時にブラウザで認証が必要

3. データセットの準備
手書きカレンダー用データセット
手書きテキスト認識の精度を向上させるために、以下のデータセットが有効です：

**推奨データセット**

・IAMデータセット: 手書き文字認識で広く使用される標準的なデータセット

・CVLデータセット: 約310人の参加者による7つの手書き文書で構成され、約83,000の単語切り出し画像を含む

・RIMES: フランス語の手書きテキストデータセット、約60,000の画像と1,300人の著者による


**データ収集のベストプラクティス**

多様性: 異なる手書きスタイル、フォント、レイアウトを含める

解像度: 最低300DPI以上の高解像度画像を使用

アノテーション品質: 正確なラベリングが最も重要

バランス: 各カテゴリ（時間、タイトル、場所など）のデータを均等に収集

In [None]:
# アノテーションファイルの形式例
{
  "image_filename": "calendar_001.png",
  "events": [
    {
      "date": "2024-06-20",
      "time": "10:00",
      "title": "会議",
      "description": "月次レビュー",
      "location": "会議室A",
      "duration": 60
    }
  ]
}

**基本的な使用例**

In [None]:
from calendar_app import CalendarApp

# アプリケーションの初期化
app = CalendarApp(credentials_file="credentials.json")

# 単一画像の処理
result = app.run_pipeline("calendar_image.png")
print(result)

**ファインチューニング**

In [None]:
# カスタムデータでのファインチューニング
trainer = app.fine_tune_model(
    train_data_path="./calendar_images",
    annotations_file="annotations.json",
    output_dir="./custom_model"
)

# ファインチューニング後のモデルで再初期化
app = CalendarApp(model_path="./custom_model")

**ファインチューニング**

In [None]:
# カスタムデータでのファインチューニング
trainer = app.fine_tune_model(
    train_data_path="./calendar_images",
    annotations_file="annotations.json",
    output_dir="./custom_model"
)

# ファインチューニング後のモデルで再初期化
app = CalendarApp(model_path="./custom_model")

**5. データセットの具体的な作成方法**
手書きカレンダーデータセットの作成

In [None]:
import os
import json
from PIL import Image, ImageDraw, ImageFont
import random

def create_handwritten_calendar_dataset(num_samples=1000):
    """手書き風カレンダーデータセットの作成"""

    # 手書き風フォントの準備（実際の手書きデータを使用することを推奨）
    fonts = [
        "path/to/handwritten_font1.ttf",
        "path/to/handwritten_font2.ttf",
        # 追加の手書きフォント
    ]

    events_data = []

    for i in range(num_samples):
        # ランダムなカレンダーレイアウトを生成
        img = Image.new('RGB', (800, 600), color='white')
        draw = ImageDraw.Draw(img)

        # 日付とイベントの生成
        events = generate_random_events()

        # 手書き風でテキストを描画
        draw_handwritten_calendar(draw, events, fonts)

        # 画像を保存
        img_path = f"handwritten_calendar_{i:04d}.png"
        img.save(img_path)

        # アノテーションデータの作成
        annotation = {
            "image_filename": img_path,
            "events": events
        }
        events_data.append(annotation)

    # アノテーションファイルの保存
    with open("handwritten_annotations.json", "w", encoding="utf-8") as f:
        json.dump(events_data, f, ensure_ascii=False, indent=2)

def generate_random_events():
    """ランダムなイベントデータの生成"""
    import random
    from datetime import datetime, timedelta

    event_types = [
        "会議", "打ち合わせ", "面談", "プレゼン", "研修",
        "セミナー", "レビュー", "相談", "説明会", "ワークショップ",
        "歯医者", "病院", "美容院", "買い物", "映画鑑賞",
        "ランチ", "ディナー", "飲み会", "パーティー", "旅行",
        "ジム", "ヨガ", "ランニング", "読書", "勉強"
    ]

    locations = [
        "会議室A", "会議室B", "本社", "支社", "カフェ", "レストラン",
        "病院", "銀座", "渋谷", "新宿", "池袋", "自宅", "図書館",
        "ジム", "公園", "駅前", "オフィス", "クライアント先"
    ]

    events = []
    num_events = random.randint(1, 8)  # 1日1-8個のイベント

    for i in range(num_events):
        hour = random.randint(8, 22)
        minute = random.choice([0, 15, 30, 45])

        event = {
            "date": "2024-06-20",  # 固定日付（実際は日付も変動させる）
            "time": f"{hour:02d}:{minute:02d}",
            "title": random.choice(event_types),
            "description": f"詳細: {random.choice(['重要', '緊急', '定期', '新規', '継続'])}",
            "location": random.choice(locations),
            "duration": random.choice([30, 60, 90, 120, 180])
        }
        events.append(event)

    return events

**実際の手書きデータの収集**

In [None]:
def collect_real_handwritten_data():
    """実際の手書きデータ収集のガイドライン"""

    guidelines = """
    1. 多様な書き手からデータを収集
       - 年齢、性別、職業の異なる100人以上
       - 各人から最低10枚のカレンダー画像

    2. 様々なスタイルのカレンダー
       - 手帳タイプ
       - 壁掛けカレンダー
       - デスクカレンダー
       - デジタルデバイスの手書きメモ

    3. 異なる記入ツール
       - ボールペン
       - 鉛筆
       - マーカー
       - デジタルペン

    4. 撮影条件の統一
       - 解像度: 最低300DPI
       - 照明: 均一な明るさ
       - 角度: 正面から撮影
       - 背景: 白または単色
    """

    return guidelines

**6. 高度な設定とカスタマイズ**
モデルパラメータの調整

In [None]:
# ファインチューニング時の詳細設定
training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=10,              # エポック数
    per_device_train_batch_size=4,    # バッチサイズ
    per_device_eval_batch_size=4,
    warmup_steps=500,                 # ウォームアップステップ
    weight_decay=0.01,                # 重み減衰
    learning_rate=3e-5,               # 学習率
    logging_dir='./logs',
    logging_steps=100,
    save_steps=1000,
    eval_steps=500,
    evaluation_strategy="steps",
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    save_total_limit=3,
    fp16=True,                        # 半精度浮動小数点
)

**カスタムプロンプトの設定**

In [None]:
# 特定のタスクに応じたプロンプトのカスタマイズ
class CustomDonutProcessor:
    def __init__(self):
        self.task_prompts = {
            "calendar": "<s_calendar>",
            "schedule": "<s_schedule>",
            "meeting": "<s_meeting>",
            "handwritten": "<s_handwritten>"
        }

    def get_prompt(self, task_type):
        return self.task_prompts.get(task_type, "<s_calendar>")

**後処理とデータ検証**

In [None]:
def validate_extracted_events(events: List[CalendarEvent]) -> List[CalendarEvent]:
    """抽出されたイベントの検証と修正"""

    validated_events = []

    for event in events:
        # 日付フォーマットの検証
        try:
            datetime.strptime(event.date, "%Y-%m-%d")
        except ValueError:
            # 日付フォーマットの修正を試行
            event.date = fix_date_format(event.date)

        # 時間フォーマットの検証
        try:
            datetime.strptime(event.time, "%H:%M")
        except ValueError:
            event.time = fix_time_format(event.time)

        # タイトルの検証（空でないか）
        if event.title and event.title.strip():
            validated_events.append(event)

    return validated_events

def fix_date_format(date_str: str) -> str:
    """日付フォーマットの修正"""
    # 様々な日付フォーマットを標準形式に変換
    patterns = [
        r"(\d{4})/(\d{1,2})/(\d{1,2})",  # 2024/6/20
        r"(\d{1,2})/(\d{1,2})/(\d{4})",  # 6/20/2024
        r"(\d{4})-(\d{1,2})-(\d{1,2})",  # 2024-6-20
    ]

    for pattern in patterns:
        match = re.match(pattern, date_str)
        if match:
            # 適切なフォーマットに変換
            return format_date(match.groups())

    return date_str  # 変換できない場合は元のまま

**7. 性能評価と最適化**
評価メトリクス

In [None]:
def evaluate_model_performance(model, test_dataset):
    """モデル性能の評価"""

    metrics = {
        "accuracy": 0.0,
        "precision": 0.0,
        "recall": 0.0,
        "f1_score": 0.0,
        "exact_match": 0.0
    }

    correct_predictions = 0
    total_predictions = 0

    for sample in test_dataset:
        # 予測実行
        predicted_events = model.extract_schedule_from_image(sample["image_path"])
        ground_truth_events = json.loads(sample["ground_truth"])["events"]

        # 各フィールドの正確性を評価
        field_accuracy = evaluate_field_accuracy(predicted_events, ground_truth_events)

        # メトリクスの更新
        update_metrics(metrics, field_accuracy)

        total_predictions += 1

    # 最終メトリクスの計算
    finalize_metrics(metrics, total_predictions)

    return metrics

def evaluate_field_accuracy(predicted, ground_truth):
    """各フィールドの正確性評価"""

    field_scores = {
        "date": 0.0,
        "time": 0.0,
        "title": 0.0,
        "location": 0.0
    }

    # 各イベントの各フィールドを比較
    for i, pred_event in enumerate(predicted):
        if i < len(ground_truth):
            gt_event = ground_truth[i]

            # 完全一致 vs 部分一致の評価
            field_scores["date"] += compare_field(pred_event.date, gt_event["date"])
            field_scores["time"] += compare_field(pred_event.time, gt_event["time"])
            field_scores["title"] += compare_field(pred_event.title, gt_event["title"], fuzzy=True)
            field_scores["location"] += compare_field(pred_event.location, gt_event.get("location", ""))

    return field_scores

**パフォーマンス最適化**

In [None]:
def optimize_model_inference():
    """推論速度の最適化"""

    optimization_tips = """
    1. モデルの量子化
       - PyTorchの量子化機能を使用
       - INT8量子化で推論速度を向上

    2. TensorRT最適化（NVIDIA GPU）
       - torch2trtを使用してTensorRTエンジンに変換

    3. ONNX変換
       - より汎用的な推論エンジンでの実行

    4. バッチ処理
       - 複数画像の同時処理

    5. GPU最適化
       - 適切なバッチサイズの設定
       - メモリ使用量の監視
    """

    return optimization_tips

# 量子化の例
def quantize_model(model, calibration_dataset):
    """モデルの量子化"""

    import torch.quantization as quantization

    # 量子化設定
    model.qconfig = quantization.get_default_qconfig('fbgemm')

    # 量子化準備
    model_prepared = quantization.prepare(model)

    # キャリブレーション
    with torch.no_grad():
        for sample in calibration_dataset:
            model_prepared(sample)

    # 量子化実行
    model_quantized = quantization.convert(model_prepared)

    return model_quantized

**8. トラブルシューティング**
よくある問題と解決方法

In [None]:
# 解決方法
- バッチサイズを小さくする (batch_size=1)
- 画像サイズを縮小する
- gradient_checkpointingを有効にする
- dataloader_num_workers=0に設定

2. **学習が収束しない**

In [None]:
# 解決方法
- 学習率を下げる (learning_rate=1e-5)
- ウォームアップステップを増やす
- より多くのデータを収集する
- データの品質を向上させる

3. **手書き文字の認識精度が低い**

In [None]:
# 解決方法
- 手書き特化のデータセットを追加
- データ拡張を適用
- より長い時間のファインチューニング
- 複数のモデルのアンサンブル

4. Google Calendar API **エラー**

In [None]:
# 解決方法
- credentials.jsonの確認
- スコープ設定の確認
- APIクォータの確認
- ネットワーク接続の確認

**9. 実運用での注意点**
セキュリティ考慮事項

In [None]:
# 機密情報の保護
- 画像データの暗号化
- 認証情報の適切な管理
- ログ出力の制限
- データの定期的な削除

**スケーラビリティ**

In [None]:
# 大量処理への対応
- 非同期処理の実装
- キューシステムの導入
- 分散処理の検討
- データベースの使用

**監視とログ**

In [None]:
import logging
from datetime import datetime

# ログ設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('calendar_app.log'),
        logging.StreamHandler()
    ]
)

# 処理時間の監視
def monitor_processing_time(func):
    def wrapper(*args, **kwargs):
        start_time = datetime.now()
        result = func(*args, **kwargs)
        end_time = datetime.now()

        processing_time = (end_time - start_time).total_seconds()
        logging.info(f"{func.__name__} took {processing_time:.2f} seconds")

        return result
    return wrapper

**10. 今後の改善点**
**機能拡張の提案**

多言語対応: 英語、中国語、韓国語など

音声入力: 音声からのスケジュール追加

画像前処理: ノイズ除去、傾き補正

リアルタイム処理: カメラからの直接入力

スマートフォンアプリ: モバイルアプリケーション



**精度向上のための取り組み**

より大規模なデータセット: 10万枚以上の画像

ドメイン適応: 特定業界向けの特化モデル

マルチモーダル学習: テキストと画像の組み合わせ

自己学習: ユーザーフィードバックからの学習

これで完全なセットアップガイドが完成しました。このシステムを使用することで、カレンダー画像から高精度でスケジュール情報を抽出し、自動的にGoogleカレンダーに登録することができます。

# ここから下　メインコード

In [None]:
# Donutを使ったカレンダー画像からGoogleカレンダー自動登録システム

import os
import json
import torch
import numpy as np
from PIL import Image
from datetime import datetime, timedelta
import re
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
import logging

# 必要なライブラリのインストール
"""
pip install transformers torch torchvision pillow
pip install datasets accelerate
pip install google-auth google-auth-oauthlib google-auth-httplib2
pip install google-api-python-client
pip install pytesseract  # 比較用
pip install opencv-python
"""

# Transformers関連
from transformers import (
    DonutProcessor,
    VisionEncoderDecoderModel,
    Trainer,
    TrainingArguments,
    EarlyStoppingCallback
)
from datasets import Dataset
import torch.nn as nn

# Googleカレンダー連携
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

# ログ設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class CalendarEvent:
    """カレンダーイベントのデータクラス"""
    date: str
    time: str
    title: str
    description: Optional[str] = None
    location: Optional[str] = None
    duration: Optional[int] = 60  # 分単位

class DonutCalendarExtractor:
    """Donutを使ったカレンダー画像からのスケジュール抽出クラス"""

    def __init__(self, model_path: str = None):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model_path = model_path or "naver-clova-ix/donut-base"
        self.processor = None
        self.model = None
        self.load_model()

    def load_model(self):
        """モデルとプロセッサーの読み込み"""
        try:
            self.processor = DonutProcessor.from_pretrained(self.model_path)
            self.model = VisionEncoderDecoderModel.from_pretrained(self.model_path)
            self.model.to(self.device)
            logger.info(f"Model loaded successfully from {self.model_path}")
        except Exception as e:
            logger.error(f"Error loading model: {e}")
            raise

    def extract_schedule_from_image(self, image_path: str) -> List[CalendarEvent]:
        """画像からスケジュールを抽出"""
        try:
            # 画像の読み込み
            image = Image.open(image_path).convert("RGB")

            # プロンプトを設定（カレンダー情報抽出用）
            task_prompt = "<s_calendar>"
            decoder_input_ids = self.processor.tokenizer(
                task_prompt,
                add_special_tokens=False,
                return_tensors="pt"
            ).input_ids

            # 画像を処理
            pixel_values = self.processor(
                image,
                return_tensors="pt"
            ).pixel_values

            # 推論実行
            outputs = self.model.generate(
                pixel_values.to(self.device),
                decoder_input_ids=decoder_input_ids.to(self.device),
                max_length=self.model.decoder.config.max_position_embeddings,
                early_stopping=True,
                pad_token_id=self.processor.tokenizer.pad_token_id,
                eos_token_id=self.processor.tokenizer.eos_token_id,
                use_cache=True,
                num_beams=1,
                bad_words_ids=[[self.processor.tokenizer.unk_token_id]],
                return_dict_in_generate=True,
            )

            # 結果をデコード
            sequence = self.processor.batch_decode(outputs.sequences)[0]
            sequence = sequence.replace(self.processor.tokenizer.eos_token, "").replace(self.processor.tokenizer.pad_token, "")
            sequence = re.sub(r"<.*?>", "", sequence, count=1).strip()

            # JSONパース
            events_data = json.loads(sequence)

            # CalendarEventオブジェクトに変換
            events = []
            for event_data in events_data.get("events", []):
                event = CalendarEvent(
                    date=event_data.get("date"),
                    time=event_data.get("time"),
                    title=event_data.get("title"),
                    description=event_data.get("description"),
                    location=event_data.get("location"),
                    duration=event_data.get("duration", 60)
                )
                events.append(event)

            return events

        except Exception as e:
            logger.error(f"Error extracting schedule: {e}")
            return []

class CalendarDatasetCreator:
    """カレンダーデータセット作成クラス"""

    def __init__(self):
        self.dataset_path = "calendar_dataset"

    def create_synthetic_calendar_data(self, num_samples: int = 1000) -> Dataset:
        """合成カレンダーデータの作成"""
        import random
        from datetime import datetime, timedelta

        # サンプルイベントタイプ
        event_types = [
            "会議", "ミーティング", "面談", "プレゼン", "研修",
            "セミナー", "打ち合わせ", "相談", "レビュー", "説明会",
            "歯医者", "病院", "美容院", "買い物", "映画",
            "ランチ", "ディナー", "飲み会", "パーティー", "旅行"
        ]

        locations = [
            "会議室A", "会議室B", "本社", "支社", "カフェ",
            "レストラン", "病院", "銀座", "渋谷", "新宿", "池袋"
        ]

        dataset = []

        for i in range(num_samples):
            # ランダムな日付と時間を生成
            base_date = datetime.now() + timedelta(days=random.randint(0, 365))

            # 1日に1-5個のイベントを生成
            events_per_day = random.randint(1, 5)
            daily_events = []

            for j in range(events_per_day):
                hour = random.randint(8, 20)
                minute = random.choice([0, 15, 30, 45])

                event = {
                    "date": base_date.strftime("%Y-%m-%d"),
                    "time": f"{hour:02d}:{minute:02d}",
                    "title": f"{random.choice(event_types)}{j+1}",
                    "description": f"詳細情報{j+1}",
                    "location": random.choice(locations),
                    "duration": random.choice([30, 60, 90, 120])
                }
                daily_events.append(event)

            # JSONフォーマットでグラウンドトゥルースを作成
            ground_truth = json.dumps({"events": daily_events}, ensure_ascii=False)

            dataset.append({
                "image_path": f"synthetic_calendar_{i}.png",
                "ground_truth": ground_truth
            })

        return Dataset.from_list(dataset)

    def create_training_dataset(self, image_dir: str, annotations_file: str) -> Dataset:
        """実際の画像とアノテーションからデータセットを作成"""

        # アノテーションファイルの読み込み
        with open(annotations_file, 'r', encoding='utf-8') as f:
            annotations = json.load(f)

        dataset = []
        for annotation in annotations:
            image_path = os.path.join(image_dir, annotation["image_filename"])
            if os.path.exists(image_path):
                dataset.append({
                    "image_path": image_path,
                    "ground_truth": json.dumps(annotation["events"], ensure_ascii=False)
                })

        return Dataset.from_list(dataset)

class DonutCalendarTrainer:
    """Donutファインチューニング用クラス"""

    def __init__(self, model_name: str = "naver-clova-ix/donut-base"):
        self.model_name = model_name
        self.processor = DonutProcessor.from_pretrained(model_name)
        self.model = VisionEncoderDecoderModel.from_pretrained(model_name)

        # 特別トークンの追加
        self.processor.tokenizer.add_tokens(["<s_calendar>"])
        self.model.decoder.resize_token_embeddings(len(self.processor.tokenizer))

        # 新しいトークンの追加
        self.added_tokens = self.processor.tokenizer.get_added_vocab()
        self.model.config.pad_token_id = self.processor.tokenizer.pad_token_id
        self.model.config.decoder_start_token_id = self.processor.tokenizer.convert_tokens_to_ids(["<s_calendar>"])[0]

    def preprocess_data(self, dataset: Dataset) -> Dataset:
        """データの前処理"""

        def process_sample(sample):
            try:
                # 画像の読み込み
                image = Image.open(sample["image_path"]).convert("RGB")

                # プロンプトとターゲットの設定
                task_prompt = "<s_calendar>"

                # 画像処理
                pixel_values = self.processor(
                    image,
                    return_tensors="pt"
                ).pixel_values.squeeze()

                # テキスト処理
                target_sequence = f"{task_prompt} {sample['ground_truth']}"

                # トークン化
                input_ids = self.processor.tokenizer(
                    target_sequence,
                    add_special_tokens=False,
                    max_length=self.processor.tokenizer.model_max_length,
                    padding="max_length",
                    truncation=True,
                    return_tensors="pt"
                ).input_ids.squeeze()

                # ラベルの作成（input_idsのコピー、パディングトークンは-100に）
                labels = input_ids.clone()
                labels[labels == self.processor.tokenizer.pad_token_id] = -100

                return {
                    "pixel_values": pixel_values,
                    "labels": labels,
                    "target_sequence": target_sequence
                }

            except Exception as e:
                logger.error(f"Error processing sample: {e}")
                return None

        processed_dataset = dataset.map(
            process_sample,
            remove_columns=dataset.column_names
        ).filter(lambda x: x is not None)

        return processed_dataset

    def train(self, train_dataset: Dataset, val_dataset: Dataset = None, output_dir: str = "./donut-calendar-finetuned"):
        """モデルのファインチューニング"""

        # データセットの前処理
        train_dataset = self.preprocess_data(train_dataset)
        if val_dataset:
            val_dataset = self.preprocess_data(val_dataset)

        # トレーニング引数の設定
        training_args = TrainingArguments(
            output_dir=output_dir,
            per_device_train_batch_size=2,
            per_device_eval_batch_size=2,
            num_train_epochs=5,
            learning_rate=5e-5,
            warmup_steps=300,
            logging_steps=100,
            save_steps=500,
            evaluation_strategy="steps" if val_dataset else "no",
            eval_steps=500 if val_dataset else None,
            save_total_limit=3,
            load_best_model_at_end=True if val_dataset else False,
            metric_for_best_model="eval_loss" if val_dataset else None,
            greater_is_better=False,
            dataloader_pin_memory=False,
            dataloader_num_workers=0,
            fp16=True if torch.cuda.is_available() else False,
        )

        # トレーナーの初期化
        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            callbacks=[EarlyStoppingCallback(early_stopping_patience=3)] if val_dataset else None,
        )

        # トレーニング実行
        logger.info("Starting training...")
        trainer.train()

        # モデルとプロセッサーの保存
        trainer.save_model()
        self.processor.save_pretrained(output_dir)

        logger.info(f"Model saved to {output_dir}")

        return trainer

class GoogleCalendarIntegration:
    """Googleカレンダー連携クラス"""

    def __init__(self, credentials_file: str = "credentials.json"):
        self.SCOPES = ['https://www.googleapis.com/auth/calendar']
        self.credentials_file = credentials_file
        self.service = None
        self.authenticate()

    def authenticate(self):
        """Google認証"""
        creds = None

        # token.jsonファイルがあれば読み込み
        if os.path.exists('token.json'):
            creds = Credentials.from_authorized_user_file('token.json', self.SCOPES)

        # 認証情報が無効または存在しない場合
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    self.credentials_file, self.SCOPES)
                creds = flow.run_local_server(port=0)

            # 認証情報を保存
            with open('token.json', 'w') as token:
                token.write(creds.to_json())

        # サービスの初期化
        self.service = build('calendar', 'v3', credentials=creds)
        logger.info("Google Calendar authentication successful")

    def create_event(self, event: CalendarEvent, calendar_id: str = 'primary') -> bool:
        """カレンダーイベントの作成"""
        try:
            # 日時の解析とフォーマット
            event_date = datetime.strptime(event.date, "%Y-%m-%d")
            event_time = datetime.strptime(event.time, "%H:%M").time()

            # 開始時刻の設定
            start_datetime = datetime.combine(event_date, event_time)

            # 終了時刻の設定（duration分後）
            end_datetime = start_datetime + timedelta(minutes=event.duration)

            # Googleカレンダー用のイベントデータ
            event_data = {
                'summary': event.title,
                'description': event.description or '',
                'location': event.location or '',
                'start': {
                    'dateTime': start_datetime.isoformat(),
                    'timeZone': 'Asia/Tokyo',
                },
                'end': {
                    'dateTime': end_datetime.isoformat(),
                    'timeZone': 'Asia/Tokyo',
                },
            }

            # イベントの作成
            created_event = self.service.events().insert(
                calendarId=calendar_id,
                body=event_data
            ).execute()

            logger.info(f"Event created: {created_event.get('htmlLink')}")
            return True

        except Exception as e:
            logger.error(f"Error creating event: {e}")
            return False

    def create_events_batch(self, events: List[CalendarEvent], calendar_id: str = 'primary') -> int:
        """複数のイベントを一括作成"""
        success_count = 0

        for event in events:
            if self.create_event(event, calendar_id):
                success_count += 1

        logger.info(f"Successfully created {success_count}/{len(events)} events")
        return success_count

class CalendarApp:
    """メインアプリケーションクラス"""

    def __init__(self, model_path: str = None, credentials_file: str = "credentials.json"):
        self.extractor = DonutCalendarExtractor(model_path)
        self.calendar_integration = GoogleCalendarIntegration(credentials_file)
        self.dataset_creator = CalendarDatasetCreator()
        self.trainer = None

    def process_calendar_image(self, image_path: str) -> List[CalendarEvent]:
        """カレンダー画像を処理してイベントを抽出"""
        logger.info(f"Processing image: {image_path}")
        return self.extractor.extract_schedule_from_image(image_path)

    def add_events_to_calendar(self, events: List[CalendarEvent], calendar_id: str = 'primary') -> int:
        """抽出されたイベントをGoogleカレンダーに追加"""
        return self.calendar_integration.create_events_batch(events, calendar_id)

    def fine_tune_model(self, train_data_path: str, annotations_file: str, output_dir: str = "./donut-calendar-finetuned"):
        """モデルのファインチューニング"""

        # トレーナーの初期化
        self.trainer = DonutCalendarTrainer()

        # データセットの作成
        train_dataset = self.dataset_creator.create_training_dataset(train_data_path, annotations_file)

        # 合成データの追加（オプション）
        synthetic_dataset = self.dataset_creator.create_synthetic_calendar_data(500)

        # データセットの結合
        from datasets import concatenate_datasets
        combined_dataset = concatenate_datasets([train_dataset, synthetic_dataset])

        # トreーニング・検証データの分割
        split_dataset = combined_dataset.train_test_split(test_size=0.2, random_state=42)

        # ファインチューニング実行
        trainer = self.trainer.train(
            train_dataset=split_dataset['train'],
            val_dataset=split_dataset['test'],
            output_dir=output_dir
        )

        # モデルの更新
        self.extractor = DonutCalendarExtractor(output_dir)

        return trainer

    def run_pipeline(self, image_path: str) -> Dict:
        """画像からGoogleカレンダー登録までの完全パイプライン"""

        # 1. 画像からスケジュール抽出
        events = self.process_calendar_image(image_path)

        if not events:
            return {"status": "error", "message": "No events extracted from image"}

        # 2. Googleカレンダーに登録
        success_count = self.add_events_to_calendar(events)

        return {
            "status": "success",
            "events_extracted": len(events),
            "events_created": success_count,
            "events": [
                {
                    "date": event.date,
                    "time": event.time,
                    "title": event.title,
                    "description": event.description,
                    "location": event.location
                }
                for event in events
            ]
        }

# 使用例とテスト関数
def create_sample_annotation_file():
    """サンプルアノテーションファイルの作成"""
    sample_annotations = [
        {
            "image_filename": "calendar_sample_1.png",
            "events": [
                {
                    "date": "2024-06-20",
                    "time": "10:00",
                    "title": "朝礼",
                    "description": "全体朝礼",
                    "location": "会議室A",
                    "duration": 30
                },
                {
                    "date": "2024-06-20",
                    "time": "14:00",
                    "title": "クライアント打ち合わせ",
                    "description": "新規プロジェクトについて",
                    "location": "会議室B",
                    "duration": 120
                }
            ]
        }
    ]

    with open("sample_annotations.json", "w", encoding="utf-8") as f:
        json.dump(sample_annotations, f, ensure_ascii=False, indent=2)

    print("サンプルアノテーションファイルを作成しました: sample_annotations.json")

def main():
    """メイン実行関数"""

    # サンプルアノテーションファイルの作成
    create_sample_annotation_file()

    # アプリケーションの初期化
    app = CalendarApp()

    # 使用例1: 既存のモデルを使用
    try:
        # 画像からスケジュール抽出・カレンダー登録
        result = app.run_pipeline("path/to/your/calendar_image.png")
        print("処理結果:", result)
    except Exception as e:
        print(f"エラー: {e}")

    # 使用例2: モデルのファインチューニング
    try:
        # カスタムデータでファインチューニング
        trainer = app.fine_tune_model(
            train_data_path="./calendar_images",
            annotations_file="sample_annotations.json",
            output_dir="./donut-calendar-finetuned"
        )
        print("ファインチューニング完了")

        # ファインチューニング後のモデルでテスト
        result = app.run_pipeline("path/to/your/calendar_image.png")
        print("ファインチューニング後の結果:", result)

    except Exception as e:
        print(f"ファインチューニングエラー: {e}")

if __name__ == "__main__":
    main()