# タスク 2b: 抽象的なテキスト要約

このノートブックでは、大規模なドキュメント要約で発生する課題に対処します。入力テキストがモデルのコンテキスト長を超えたり、ハルシネーションを生成したり、メモリ不足エラーを引き起こしたりする可能性があります。

これらの問題を軽減するために、このノートブックでは、言語モデルを活用するアプリケーションを可能にするツールキットである [LangChain](https://python.langchain.com/docs/get_started/introduction.html) フレームワークを使用してプロンプトのチャンク化と連鎖化を使用するアーキテクチャを示します。

ユーザー ドキュメントがトークン制限を超えた場合のシナリオに対処するアプローチを探ります。チャンク化では、ドキュメントをコンテキスト長のしきい値以下のセグメントに分割してから、モデルに順番に入力します。これにより、プロンプトがチャンク間で連鎖され、以前のコンテキストが保持されます。このアプローチを適用して、通話のトランスクリプト、会議のトランスクリプト、書籍、記事、ブログ投稿、その他の関連コンテンツを要約します。


## Task 2b.1: 環境のセットアップ

このタスクでは、環境をセットアップし、AWS リージョンを自動的に検出する Bedrock クライアントを作成します。

In [None]:
#Create a service client by name using the default session.
import json
import os
import sys
import time
import random
from typing import Any, List, Mapping, Optional

# AWS and Bedrock imports
import boto3

# Get the region programmatically
session = boto3.session.Session()
region = session.region_name or "us-east-1"  # Default to us-east-1 if region not set

module_path = ".."
sys.path.append(os.path.abspath(module_path))
bedrock_client = boto3.client('bedrock-runtime', region_name=region)


## タスク 2b.2: 長いテキストを要約する

### Boto3 を使用した LangChain の構成

このタスクでは、LangChain Bedrock クラスの LLM を指定し、推論用の引数を渡すことができます。

In [None]:
# LangChain imports
from langchain_aws import BedrockLLM
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from langchain.chains.summarize import load_summarize_chain
from langchain_core.callbacks.manager import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM

# Base LLM configuration
modelId = "amazon.nova-lite-v1:0"

class NovaLiteWrapper(LLM):
    """Wrapper for Nova Lite model that formats inputs correctly."""
    
    @property
    def _llm_type(self) -> str:
        return "nova-lite-wrapper"
    
    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Format prompt for Nova Lite and process."""
        # Format the prompt for Nova Lite's expected message structure
        formatted_input = {
            "messages": [
                {
                    "role": "user",
                    "content": [{"text": prompt}]  # Content must be an array with text objects
                }
            ],
            "inferenceConfig": {
                "maxTokens": 2048,
                "temperature": 0,
                "topP": 0.9
            }
        }
        
        # Call Bedrock directly with the properly formatted input
        response = bedrock_client.invoke_model(
            modelId=modelId,
            body=json.dumps(formatted_input)
        )
        
        # Parse the response - updated to handle Nova Lite's response format
        response_body = json.loads(response['body'].read().decode('utf-8'))
        
        # Extract the text from the response
        if 'output' in response_body and 'message' in response_body['output']:
            message = response_body['output']['message']
            if 'content' in message and isinstance(message['content'], list):
                # Extract text from each content item
                texts = []
                for content_item in message['content']:
                    if isinstance(content_item, dict) and 'text' in content_item:
                        texts.append(content_item['text'])
                return ' '.join(texts)
        
        # Fallback if the response format is different
        return str(response_body)
    
    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        return {"model_id": modelId}
    
    def get_num_tokens(self, text: str) -> int:
        """Estimate token count - Nova Lite uses roughly 1 token per 4 characters."""
        return len(text) // 4  # Rough approximation

# Create the Nova Lite wrapper
llm = NovaLiteWrapper()


## リソース最適化された LLM ラッパーの作成

Bedrock のサービスクォータを効果的に処理するために、リソース使用量を最適化し、API 呼び出しにジッター付きの指数バックオフを実装するラッパークラスを作成します。

In [None]:
# Enhanced resource-optimized LLM wrapper with exponential backoff
class ResourceOptimizedLLM(LLM):
    """Wrapper that optimizes resource usage for LLM processing."""
    
    llm: Any  # The base LLM to wrap
    min_pause: float = 30.0  # Minimum pause between requests
    max_pause: float = 60.0  # Maximum pause after throttling
    initial_pause: float = 10.0  # Initial pause between requests
    
    @property
    def _llm_type(self) -> str:
        return f"optimized-{self.llm._llm_type}"
    
    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Process with resource optimization and exponential backoff."""
        # Always pause between requests to optimize resource usage
        time.sleep(self.initial_pause)
        
        # Implement retry with exponential backoff
        max_retries = 10  # More retries for important operations
        base_delay = self.min_pause
        
        for attempt in range(max_retries):
            try:
                print(f"Making API call (attempt {attempt+1}/{max_retries})...")
                return self.llm._call(prompt, stop=stop, run_manager=run_manager, **kwargs)
            
            except Exception as e:
                error_str = str(e)
                
                # Handle different types of service exceptions
                if any(err in error_str for err in ["ThrottlingException", "TooManyRequests", "Rate exceeded"]):
                    if attempt < max_retries - 1:
                        # Calculate backoff with jitter to prevent request clustering
                        jitter = random.random() * 0.5
                        wait_time = min(base_delay * (2 ** attempt) + jitter, self.max_pause)
                        
                        print(f"Service capacity reached. Backing off for {wait_time:.2f} seconds...")
                        time.sleep(wait_time)
                    else:
                        print("Maximum retries reached. Consider reducing batch size or increasing delays.")
                        raise
                else:
                    # For non-capacity errors, don't retry
                    print(f"Non-capacity error: {error_str}")
                    raise
    
    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        return {**self.llm._identifying_params, "initial_pause": self.initial_pause}
    
    def get_num_tokens(self, text: str) -> int:
        """Pass through token counting to the base model."""
        return self.llm.get_num_tokens(text)

# Create the resource-optimized LLM
resource_optimized_llm = ResourceOptimizedLLM(llm=llm, initial_pause=10.0)



<i aria-hidden="true" class="fas fa-sticky-note" style="color:#563377"></i> **注:** このラッパーは、本番環境での使用に重要な機能を追加します。

- サービスクォータを遵守するためのリクエスト間の自動一時停止
- スロットリング例外を処理するためのジッター付き指数バックオフ
- 包括的なエラー処理とレポート

## タスク 2b.3: 多数のトークンを含むテキスト ファイルの読み込み

このタスクでは、letters ディレクトリに [Amazon の CEO による 2022 年の株主への手紙](https://www.aboutamazon.com/news/company-news/amazon-ceo-andy-jassy-2022-letter-to-shareholders) のコピーを使用します。テキストファイルを読み込み、発生する可能性のあるエラーを処理する関数を作成します。

In [None]:
# Document loading function
def load_document(file_path):
    """Load document from file."""
    try:
        with open(file_path, "r", encoding="utf-8") as file:
            content = file.read()
        return content
    except Exception as e:
        print(f"Error loading document: {e}")
        return None

# Example usage
shareholder_letter = "./2022-letter-jp.txt"
letter = load_document(shareholder_letter)

if letter:
    num_tokens = resource_optimized_llm.get_num_tokens(letter)
    print(f"Document loaded successfully with {num_tokens} tokens")


<i aria-hidden="true" class="fas fa-sticky-note" style="color:#563377"></i> **注:** 警告は無視して次のセルに進んでください。この問題は、次の手順でドキュメントをチャンク化することで解決します。

## タスク 2b.4: 長いテキストをチャンクに分割する

このタスクでは、プロンプトに収まりきらないほど長いテキストを小さなチャンクに分割します。LangChainの`RecursiveCharacterTextSplitter`は、各チャンクのサイズがchunk_sizeよりも小さくなるまで、長いテキストを再帰的にチャンクに分割することをサポートしています。

In [None]:
# Document chunking with conservative settings
def chunk_document(text, chunk_size=4000, chunk_overlap=200):
    """Split document into manageable chunks."""
    text_splitter = RecursiveCharacterTextSplitter(
        separators=["\n\n", "\n", ".", " "],
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )
    
    chunks = text_splitter.create_documents([text])
    print(f"Document split into {len(chunks)} chunks")
    return chunks

# Split the document into chunks
if letter:
    docs = chunk_document(letter, chunk_size=4000, chunk_overlap=200)
    
    if docs:
        num_docs = len(docs)
        num_tokens_first_doc = resource_optimized_llm.get_num_tokens(docs[0].page_content)
        print(f"Now we have {num_docs} documents and the first one has {num_tokens_first_doc} tokens")


<i aria-hidden="true" class="fas fa-sticky-note" style="color:#563377"></i> **注:** `chunk_size` パラメータは、各チャンクのサイズを制御します。チャンクが大きいほどコンテキストは多くなりますが、処理リソースも必要になります。`chunk_overlap` パラメータは、チャンク間の連続性を確保します。

## タスク 2b.5: チャンクを要約して結合する

このタスクでは、分割されたドキュメントを要約するための2つのアプローチを実装します。LangChain の組み込み要約チェーンを使用する方法と、リソース使用量をより細かく制御できるカスタムの手動実装を使用する方法です。

## 実装アプローチの理解

このノートブックでは、AWS Bedrockを使用して大規模な文書を要約する2つの異なるアプローチを説明します。

<i aria-hidden="true" class="fas fa-sticky-note" style="color:#563377"></i> **注:** 本番アプリケーションを構築する際の利便性と制御のトレードオフを示すため、標準的なLangChain実装とカスタム実装の両方を含めています。

### 同じ目標への 2 つの道筋

1. **標準的な LangChain 実装*** (`process_documents_with_pacing`):
   - LangChain の組み込み要約チェーンを使用
   - より少ないコードで実装が容易
   - 基礎となる複雑さを抽象化
   - 迅速なプロトタイピングと単純なユースケースに最適

2. **カスタム改良実装** (`manual_refine_with_optimization`):
   - 改良プロセスをステップバイステップで構築
   - プロンプトと処理の完全な可視性を提供
   - 各文書チャンクに対する詳細なエラー処理
   - API コールのタイミングとリトライロジックの正確な制御が可能

両者は同じ最終結果を達成しますが、カスタム実装では、サービスクォータを扱い、本番環境ready向けのアプリケーションを構築する際に重要となる、プロセス全体のより詳細な制御が可能です。

実際のシナリオでは、開発中は標準実装から始め、リソース使用量、エラー処理、またはプロンプトエンジニアリングのより詳細な制御が必要になった時点でカスタム実装に移行することがあります。

### 標準的な LangChain の実装

In [None]:
# カスタムドキュメント処理と制御されたペーシング
def process_documents_with_pacing(docs, chain_type="refine", verbose=True):
    """Process documents with pacing to optimize resource usage."""
    
    # チェーンの設定
    summary_chain = load_summarize_chain(
        llm=resource_optimized_llm,
        chain_type=chain_type,  # "refine" processes sequentially, good for resource optimization
        verbose=verbose
    )
    
    # 追加のエラーハンドリングを伴う処理
    try:
        result = summary_chain.invoke(docs)
        return result
    except ValueError as error:
        if "AccessDeniedException" in str(error):
            print(f"\n\033[91mAccess Denied: {error}\033[0m")
            print("\nTo troubleshoot this issue, please check:")
            print("1. Your IAM permissions for Bedrock")
            print("2. Model access permissions")
            print("3. AWS credentials configuration")
            return {"output_text": "Error: Access denied. Check permissions."}
        else:
            print(f"\n\033[91mError during processing: {error}\033[0m")
            return {"output_text": f"Error during processing: {str(error)}"}


### リソース最適化を強化したカスタムリファインの実装

In [None]:
# リファインチェーンのためのリソース最適化処理の手動実装
def manual_refine_with_optimization(docs, llm, verbose=True):
    """Manually implement refine chain with resource optimization."""
    if not docs:
        return {"output_text": "No documents to process."}
    
    # 最初のドキュメントを処理して初期要約を取得
    print(f"Processing initial document (1/{len(docs)})...")
    
    # 初期ドキュメント用のシンプルなプロンプト
    initial_prompt = """Write a concise summary of the following:
    "{text}"
    CONCISE SUMMARY:"""
    
    # 最初のドキュメントを処理
    try:
        current_summary = llm(initial_prompt.format(text=docs[0].page_content))
        print("Initial summary created successfully.")
    except Exception as e:
        print(f"Error creating initial summary: {e}")
        return {"output_text": "Failed to create initial summary."}
    
    # 残りのドキュメントをリファインアプローチで処理
    for i, doc in enumerate(docs[1:], start=2):
        print(f"Refining with document {i}/{len(docs)}...")
        
        # リファインプロンプト
        refine_prompt = """既存の要約を改良してください。
        既存の要約は次のとおりです: {existing_summary}

        情報を追加する新しい文書があります: {text}

        文書からの新しい情報を取り入れて要約を更新してください。
        文書に関連情報が含まれていない場合は、既存の要約を返してください。

        改良された要約:"""
                
        try:
            # Apply resource optimization between requests
            time.sleep(10.0)  # Base delay between requests
            
            # Update the summary
            current_summary = llm(refine_prompt.format(
                existing_summary=current_summary,
                text=doc.page_content
            ))
            
            if verbose:
                print(f"Successfully refined with document {i}")
        except Exception as e:
            print(f"Error during refinement with document {i}: {e}")
            # Apply exponential backoff
            backoff = min(10.0 * (2 ** (i % 5)) + (random.random() * 2), 30)
            print(f"Backing off for {backoff:.2f} seconds...")
            time.sleep(backoff)
            
            # Try one more time
            try:
                current_summary = llm(refine_prompt.format(
                    existing_summary=current_summary,
                    text=doc.page_content
                ))
            except Exception as retry_error:
                print(f"Retry failed for document {i}: {retry_error}")
                # Continue with current summary rather than failing completely
    
    return {"output_text": current_summary}


<i aria-hidden="true" class="fas fa-sticky-note" style="color:#563377"></i> **注:** 手動で実装すると、次の点をより細かく制御できます。

- 要約に使用された正確なプロンプト
- エラー処理とリカバリー
- API 呼び出し間のリソース最適化
- エラー発生時のグレースフルな機能低下

## タスク 2b.6: メイン実行関数

ここで、ドキュメント要約プロセス全体を調整するメイン関数を作成します。

<i aria-hidden="true" class="fas fa-sticky-note" style="color:#563377"></i> **注:** メイン関数 (`summarize_document`) を使用すると、`chain_type` パラメータに基づいて使用する実装を選択できるため、結果とパフォーマンスを簡単に比較できます。

In [None]:
# メイン実行関数
def summarize_document(file_path, chunk_size=4000, chain_type="refine"):
    """Main function to summarize a document."""
    
    print(f"Starting document summarization process for: {file_path}")
    
    # 文書の読み込み
    document_text = load_document(file_path)
    if not document_text:
        return "Failed to load document."
    
    print(f"Document loaded successfully. Length: {len(document_text)} characters")
    
    # チャンクに分割
    docs = chunk_document(document_text, chunk_size=chunk_size, chunk_overlap=200)
    
    # If document is very large, provide a warning
    if len(docs) > 15:
        print(f"Warning: Document is large ({len(docs)} chunks). Processing may take some time.")
        
        # For very large documents, consider using a subset for testing
        if len(docs) > 30:
            print("Document is extremely large. Consider using a smaller chunk_size or processing a subset.")
            # Optional: process only a subset for testing
            # docs = docs[:15]
    
    # ドキュメントを処理
    print(f"Processing document using '{chain_type}' chain type...")
    
    # チェーンタイプに基づいて適切な処理方法を使用
    if chain_type == "refine":
        # リソース最適化のより良い制御のために手動実装を使用
        result = manual_refine_with_optimization(docs, resource_optimized_llm)
    else:
        # 他のチェーンタイプには標準のLangChain実装を使用
        result = process_documents_with_pacing(docs, chain_type=chain_type)
    
    # 結果を返す
    if result and "output_text" in result:
        print("\nSummarization completed successfully!")
        return result["output_text"]
    else:
        print("\nSummarization failed or returned no result.")
        return "Summarization process did not produce a valid result."


<i aria-hidden="true" class="fas fa-sticky-note" style="color:#563377"></i> **注:** ドキュメントの数、Bedrock 要求レート クォータ、および構成された再試行設定によっては、要約プロセスの実行に時間がかかる場合があります。

## タスク 2b.7: 要約を実行する

株主レターの要約を実行してみましょう。デフォルトでは、summary_document() 関数は Refinance チェーンを使用します。map_reduce を有効にするには、以下の手順に従います。

- 次の行をコメントアウトします: `summary = summarize_document(document_path, chunk_size=4000, chain_type="refine")`
- 次の行をコメント解除します: `# summary = summarize_document(document_path, chunk_size=4000, chain_type="map_reduce")`


<i aria-hidden="true" class="fas fa-sticky-note" style="color:#563377"></i> **注:** 実行中にエラーメッセージが表示されても心配しないでください。コードには堅牢なエラー処理が含まれており、失敗したリクエストは指数バックオフで自動的に再試行されます。これはサービスクォータを使用する場合の正常な動作であり、本番環境対応アプリケーションがAPI制限をどのように処理すべきかを示しています。

In [None]:
# 使用例
if __name__ == "__main__":
    # ドキュメントへのパス
    document_path = "./2022-letter-jp.txt"
    
    # 異なるオプションで要約
    # オプション1: 標準的なリファインチェーン（シーケンシャル処理、リソース最適化に適している）
    summary = summarize_document(document_path, chunk_size=4000, chain_type="refine")
    
    # オプション2: 比較用のmap_reduce（ただしサービスクォータに注意）
    # summary = summarize_document(document_path, chunk_size=4000, chain_type="map_reduce")
    
    # 最終要約の出力
    print("\n=== 最終要約 ===\n")
    print(summary)


これで、LangChainフレームワークを使用したプロンプトチャンキングとチェーニングを実験し、長文入力テキストから生じる問題を軽減しながら大きなドキュメントを要約することができました。

## 主要コンポーネントの理解

私たちのソリューションの主要コンポーネントを見直してみましょう：

1. **リソース最適化**: **ResourceOptimizedLLM**ラッパーは以下によってBedrockサービスクォータ内でAPIコールを管理します：
   - リクエスト間に一時停止を追加（**initial_pause**によって制御）
   - スロットリング発生時のジッター付き指数バックオフの実装
   - 包括的なエラー処理とリカバリの提供

2. **ドキュメントチャンキング**: **chunk_document**関数は大きなドキュメントを管理可能な部分に分割します：
   - **chunk_size** は各チャンクの最大サイズを制御（4000文字）
   - **chunk_overlap** はチャンク間のコンテキストの連続性を確保（200文字）
   - 自然なテキストセパレータ（**\n\n**, **\n**, **.** など）を使用して段落の途中での分割を回避

3. **要約アプローチ**:
   - **リファインチェーン**: チャンクを順次処理し、新しいチャンクごとに要約を改良
   - **Map-Reduce**: 各チャンクを独立して要約し、それらの要約を結合して要約

4. **エラー処理**: 包括的なエラー処理により、以下からの回復が可能：
   - サービスのスロットリングと容量制限
   - アクセス権限の問題
   - その他のAPIエラー

## 自分で試してみる

- プロンプトを特定のユースケースに変更し、異なるモデルの出力を評価する
- 異なるチャンクサイズを試して、コンテキスト保持と処理効率のバランスの最適値を見つける
- 異なる要約チェーンタイプ（**refine**対**map_reduce**）を試して結果を比較する
- Bedrockクォータ制限に基づいてリソース最適化パラメータを調整する

### 実践的な応用

このアプローチは以下のような様々な長文コンテンツの要約に適用できます：
- カスタマーサービスの通話トランスクリプト
- 会議のトランスクリプトとノート
- 研究論文と技術文書
- 法的文書と契約書
- 書籍、記事、ブログ投稿

### ベストプラクティス

このソリューションを本番環境に実装する際：

1. **API使用状況のモニタリング**: クォータ制限内に収まるようにAPIコールを追跡
2. **チャンクサイズの最適化**: コンテキスト保持と処理効率のバランスを取る
3. **適切なエラー処理の実装**: アプリケーションがAPIエラーを適切に処理できるようにする
4. **キャッシュの検討**: 頻繁にアクセスされるドキュメントの冗長なAPIコールを避けるためにキャッシュを使用
5. **様々な文書タイプでのテスト**: 異なるコンテンツには異なるチャンキング戦略が必要な場合がある

### クリーンアップ

このノートブックを完了しました。ラボの次の部分に進むには、以下を実行してください：

- このノートブックファイルを閉じて、**タスク 3 **に進んでください。