# ストリーミング

## 学習目標

* ストリーミングの仕組みを理解する
* ストリームイベントを扱えるようになる


`anthropic` SDK を import して、クライアントをセットアップしましょう：


In [None]:
from dotenv import load_dotenv
from anthropic import Anthropic

#load environment variable
load_dotenv()

#automatically looks for an "ANTHROPIC_API_KEY" environment variable
client = Anthropic()

ここまで私たちは、次のような書き方で Claude にメッセージを送ってきました：


In [None]:
response = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "Write me an essay about macaws and clay licks in the Amazon",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=800,
    temperature=0,
)
print("We have a response back!")
print("========================")
print(response.content[0].text)

この方法でも問題なく動きますが、このやり方では **すべての内容が生成し終わってから** はじめて API から内容が返ってくる点に注意してください。上のセルをもう一度実行すると、全体の応答が一括で表示されるまで何も出力されないはずです。

多くの場面ではこれで十分ですが、ユーザーが「応答が全部生成されるまで待たされる」アプリを作っている場合、体験が悪くなりやすいです。

**そこでストリーミングです！**

ストリーミングを使うと、モデルが生成した内容を「生成され次第」受け取れるため、全体が完成するのを待つ必要がありません。`claude.ai` のようなアプリはこの仕組みで、生成された内容がブラウザへ順次送られて表示されます：

![claude_streaming](images/claude_streaming.gif)


## ストリームを扱う

API からストリーミング応答を得るには、`client.messages.create` に `stream=True` を渡すだけです。ここまでは簡単です。少しややこしいのは、その後にストリーミング応答をどう扱い、流れてくるデータをどう処理するかです。


In [None]:
stream = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "Write me a 3 word sentence, without a preamble.  Just give me 3 words",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=100,
    temperature=0,
    stream=True,
)

`stream` 変数の中身を見てみましょう。


In [None]:
stream

見た目にはあまり情報がありません！この stream オブジェクト単体では、ほとんど何もしてくれません。stream はジェネレータで、API から受け取った server-sent events (SSE) を 1 つずつ yield します。

つまり、こちら側で反復（iterate）して、各 SSE を処理するコードを書く必要があります。もう「完成した 1 つの塊」としてデータが返ってくるのではありません。では実際に stream を回してみましょう：


In [None]:
for event in stream:
    print(event)

ご覧の通り、API からは多くの SSE（イベント）が届きます。これらの意味をもう少し詳しく見ていきます。次はイベントを色分けして説明した図です：




![streaming_output](images/streaming_output.png)


各ストリームは、次の順序でイベントが流れてきます：

* **MessageStartEvent** - content が空の Message
* **複数の content block**（各ブロックは次を含む）
  * **ContentBlockStartEvent**
  * 1 個以上の **ContentBlockDeltaEvent**
  * **ContentBlockStopEvent**
* 最終メッセージのトップレベルの変化を示す **MessageDeltaEvent**（1個以上）
* 最後に **MessageStopEvent**

上の例では content block は 1 つだけでした。次の図は、その 1 ブロックに関連するイベント全体を示します：

![content_block_streaming](images/content_block_streaming.png)


私たちが本当に欲しい「モデルが生成した本文」は、`ContentBlockDeltaEvent` から届きます（`type` が `"content_block_delta"`）。実際のテキストは `delta` の中の `text` にあります。生成されたテキストだけを表示してみましょう：


In [None]:
stream = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "Write me a 3 word sentence, without a preamble.  Just give me 3 words",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=100,
    temperature=0,
    stream=True,
)
for event in stream:
    if event.type == "content_block_delta":
        print(event.delta.text)

テキスト自体は出せていますが、出力が分断されて読みづらいです。Python の `print()` でストリーミングテキストを表示するときは、次の 2 つの引数が便利です：

* `end=""`: `print()` はデフォルトで末尾に改行（`
`）を付けますが、`end=""` にすると改行しません。次の `print()` が同じ行に続いて表示されます。
* `flush=True`: バッファを待たずに即時出力します。ストリーミングで「リアルタイムに表示」したいときに有効です。

これを反映してみましょう：


In [None]:
stream = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "Write me a 3 word sentence, without a preamble.  Just give me 3 words",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=100,
    temperature=0,
    stream=True,
)
for event in stream:
    if event.type == "content_block_delta":
        print(event.delta.text, flush=True, end="")

これくらい短いテキストだと、ストリーミングの効果が分かりにくいかもしれません。もう少し長いものを生成させてみましょう：


In [None]:
stream = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "How do large language models work?",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=1000,
    temperature=0,
    stream=True,
)
for event in stream:
    if event.type == "content_block_delta":
        print(event.delta.text, flush=True, end="")

まだなら、上のセルを実行してみてください。テキストが少しずつ（増分で）表示されるはずです。


ここまで見た通り、`ContentBlockDeltaEvent` にモデルが生成したテキストが入っています。ただし、他のイベントも重要です。例えばトークン使用量を知りたい場合、見るべき場所は 2 つあります：

* `MessageStartEvent` に入力（プロンプト）のトークン使用量が入る
* `MessageDeltaEvent` に出力トークン数が入る

![streaming_tokens](images/streaming_tokens.png)

では、上のコードを更新して、入力トークン数と出力トークン数を表示してみましょう：


In [None]:
stream = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "How do large language models work?",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=1000,
    temperature=0,
    stream=True,
)
for event in stream:
    if event.type == "message_start":
        input_tokens = event.message.usage.input_tokens
        print("MESSAGE START EVENT", flush=True)
        print(f"Input tokens used: {input_tokens}", flush=True)
        print("========================")
    elif event.type == "content_block_delta":
        print(event.delta.text, flush=True, end="")
    elif event.type == "message_delta":
        output_tokens = event.usage.output_tokens
        print("\n========================", flush=True)
        print("MESSAGE DELTA EVENT", flush=True)
        print(f"Output tokens used: {output_tokens}", flush=True)
        

### その他のストリーミングイベントタイプ

ストリームを扱っていると、他にも次のようなイベントに遭遇することがあります：

* **Ping イベント** - ストリームには任意個の ping が含まれ得ます。
* **Error イベント** - ストリーム内にエラーイベントが混ざることがあります。例えば高負荷時には `overloaded_error` が返ることがあり、非ストリーミングの場合の HTTP 529 に相当します。

エラーイベントの例：

```
event: error
data: {"type": "error", "error": {"type": "overloaded_error", "message": "Overloaded"}}
```


## Time to first token (TTFT)

ストリーミングを使う最大の理由は、**TTFT（Time to first token）** を改善することです。これは、あなた（またはユーザー）がモデル生成の最初の内容を受け取るまでの時間を指します。

ストリーミングが TTFT に与える影響をデモしてみましょう。

まずは非ストリーミングです。長い文章を生成させますが、`max_tokens=500` で打ち切ります：


In [None]:
import time
def measure_non_streaming_ttft():
    start_time = time.time()

    response = client.messages.create(
        max_tokens=500,
        messages=[
            {
                "role": "user",
                "content": "Write mme a long essay explaining the history of the American Revolution",
            }
        ],
        temperature=0,
        model="claude-3-haiku-20240307",
    )

    response_time = time.time() - start_time

    print(f"Time to receive first token: {response_time:.3f} seconds")
    print(f"Time to recieve complete response: {response_time:.3f} seconds")
    print(f"Total tokens generated: {response.usage.output_tokens}")
    
    print(response.content[0].text)

In [None]:
measure_non_streaming_ttft()

次に、同じことをストリーミングでやってみます：


In [None]:
def measure_streaming_ttft():
    start_time = time.time()

    stream = client.messages.create(
        max_tokens=500,
        messages=[
            {
                "role": "user",
                "content": "Write mme a long essay explaining the history of the American Revolution",
            }
        ],
        temperature=0,
        model="claude-3-haiku-20240307",
        stream=True
    )
    have_received_first_token = False
    for event in stream:
        if event.type == "content_block_delta":
            if not have_received_first_token:
                ttft = time.time() - start_time
                have_received_first_token = True
            print(event.delta.text, flush=True, end="")
        elif event.type == "message_delta":
            output_tokens = event.usage.output_tokens
            total_time = time.time() - start_time

    print(f"\nTime to receive first token: {ttft:.3f} seconds", flush=True)
    print(f"Time to recieve complete response: {total_time:.3f} seconds", flush=True)
    print(f"Total tokens generated: {output_tokens}", flush=True)
    


In [None]:
measure_streaming_ttft()

結果を比較してみましょう。

* **ストリーミングなし**
  * **最初のトークンを受け取るまで:** 4.194 秒
  * **応答全体を受け取るまで:** 4.194 秒
  * **生成トークン数:** 500
* **ストリーミングあり**
  * **最初のトークンを受け取るまで:** 0.492 秒
  * **応答全体を受け取るまで:** 4.274 秒
  * **生成トークン数:** 500

見ての通り、TTFT に大きな差があります。このデモは 500 トークンで、しかも最速モデルの Haiku を使っています。もし Opus で 1000 トークン生成する例にすると、数値はさらに大きく変わります。


In [None]:
def compare_ttft():
    def measure_streaming_ttft():
        start_time = time.time()

        stream = client.messages.create(
            max_tokens=1000,
            messages=[
                {
                    "role": "user",
                    "content": "Write mme a very very long essay explaining the history of the American Revolution",
                }
            ],
            temperature=0,
            model="claude-3-opus-20240229",
            stream=True
        )
        have_received_first_token = False
        for event in stream:
            if event.type == "content_block_delta":
                if not have_received_first_token:
                    ttft = time.time() - start_time
                    have_received_first_token = True
            elif event.type == "message_delta":
                output_tokens = event.usage.output_tokens
                total_time = time.time() - start_time
        return (ttft, output_tokens)
    
    def measure_non_streaming_ttft():
        start_time = time.time()

        response = client.messages.create(
            max_tokens=1000,
            messages=[
                {
                    "role": "user",
                    "content": "Write mme a very very long essay explaining the history of the American Revolution",
                }
            ],
            temperature=0,
            model="claude-3-opus-20240229"
        )
        ttft = time.time() - start_time
        return (ttft, response.usage.output_tokens)
    
    streaming_ttft, streaming_tokens = measure_streaming_ttft()
    non_streaming_ttft, non_streaming_tokens = measure_non_streaming_ttft()

    print("OPUS STREAMING")
    print(f"Time to first token: {streaming_ttft}")
    print(f"Tokens generated: {streaming_tokens}")
    print("#########################################################")
    print("OPUS NON STREAMING")
    print(f"Time to first token: {non_streaming_ttft}")
    print(f"Tokens generated: {non_streaming_tokens}")

        

In [None]:
# DO NOT RUN THIS! It takes over a minute to run and generates around 2000 tokens with Opus! 
compare_ttft()

Opus でより長い文章を生成すると、ストリーミングの TTFT 改善はさらに分かりやすくなります。非ストリーミングでは最初のトークンを受け取るまで 47 秒かかりましたが、ストリーミングでは 1.8 秒で受け取れました。

**注:** ストリーミングはモデルの総生成時間を魔法のように短縮するわけではありません。最初のデータは早く届きますが、リクエスト開始から最後のトークン受信までの総時間が劇的に短くなるわけではありません。


## ストリーミング用ヘルパー


Python SDK には、ストリーミングを扱うための便利機能がいくつかあります。`client.messages.create(stream=True)` の代わりに `client.messages.stream()` を使うと、便利なヘルパーメソッドにアクセスできます。`client.messages.stream()` は `MessageStreamManager`（コンテキストマネージャ）を返し、そこからイベントを emit しつつメッセージを蓄積する `MessageStream` を扱えます。

次の例では `client.messages.stream` を使い、`stream.text_stream` で「テキスト delta だけ」を簡単に取り出して表示します。イベント種別を毎回チェックする必要がありません。

さらに `get_final_message` という便利メソッドもあり、ストリームを最後まで読み終えた後に「最終的な蓄積済みメッセージ」を返してくれます。ストリーミング表示をしつつ、最終的に完成した全文も必要なときに便利です。

次の例は、届いたテキストを順次表示しつつ、完了後に最終メッセージも表示します：


In [None]:
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def streaming_with_helpers():
    async with client.messages.stream(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "Write me sonnet about orchids",
            }
        ],
        model="claude-3-opus-20240229",
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)

    final_message = await stream.get_final_message()
    print("\n\nSTREAMING IS DONE.  HERE IS THE FINAL ACCUMULATED MESSAGE: ")
    print(final_message.to_json())

await streaming_with_helpers()



`client.messages.stream()` を使う場合、任意のストリームイベント発生時や、テキスト生成時などに実行される **カスタムイベントハンドラ** も定義できます。

次の例では 2 つのイベントハンドラを使います。モデルに "generate a 5-word poem" と頼み、独自の `MyStream` クラスに次のハンドラを定義します：

* `on_text` - テキスト ContentBlock が蓄積されるときに呼ばれます。第1引数は text delta、第2引数は現在の蓄積テキストです。この例では、生成テキストをストリームに合わせて表示します（見やすさのため緑色）。
* `on_stream_event` - API からイベントを受け取るたびに呼ばれます。この例では、イベント type を出力します。

そして `client.messages.stream(..., event_handler=MyStream)` のように渡して、コールバックを登録します：


In [None]:
from anthropic import AsyncAnthropic, AsyncMessageStream

client = AsyncAnthropic()

green = '\033[32m'
reset = '\033[0m'

class MyStream(AsyncMessageStream):
    async def on_text(self, text, snapshot):
        # This runs only on text delta stream messages
        print(green + text + reset, flush=True) #model generated content is printed in green

    async def on_stream_event(self, event):
        # This runs on any stream event
        print("on_event fired:", event.type)

async def streaming_events_demo():
    async with client.messages.stream(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "Generate a 5-word poem",
            }
        ],
        model="claude-3-opus-20240229",
        event_handler=MyStream,
    ) as stream:
        # Get the final accumulated message, after the stream is exhausted
        message = await stream.get_final_message()
        print("accumulated final message: ", message.to_json())

await streaming_events_demo()

Python SDK では、他にも次のようなイベントハンドラを利用できます：

##### `on_message(message: Message)`
完全な Message オブジェクトが蓄積されたときに発火します（message_stop SSE に対応）。

##### `on_content_block(content_block: ContentBlock)`
完全な ContentBlock が蓄積されたときに発火します（content_block_stop SSE に対応）。

##### `on_exception(exception: Exception)`
ストリーミング中に例外が発生したときに発火します。

##### `on_timeout()`
リクエストがタイムアウトしたときに発火します。

##### `on_end()`
ストリームで最後に発火するイベントです。


***

## 演習

ストリーミングを使うシンプルな Claude チャットボットを書いてください。次の GIF が動作イメージです。出力の色分けは完全に任意で、GIF を見やすくするためのものです：

![streaming_chat_exercise](images/streaming_chat_exercise.gif)


### 解答例
上の演習の 1 つの実装例です。できればこのノートブックのセルとしてではなく、単体の Python スクリプトとして実行するのがおすすめです：


In [None]:
from anthropic import Anthropic

# Initialize the Anthropic client
client = Anthropic()

# ANSI color codes
BLUE = "\033[94m"
GREEN = "\033[92m"
RESET = "\033[0m"

def chat_with_claude():
    print("Welcome to the Claude Chatbot!")
    print("Type 'quit' to exit the chat.")
    
    conversation = []
    
    while True:
        user_input = input(f"{BLUE}You: {RESET}")
        
        if user_input.lower() == 'quit':
            print("Goodbye!")
            break
        
        conversation.append({"role": "user", "content": user_input})
        
        print(f"{GREEN}Claude: {RESET}", end="", flush=True)
        
        stream = client.messages.create(
            model="claude-3-haiku-20240307",
            max_tokens=1000,
            messages=conversation,
            stream=True
        )
        
        assistant_response = ""
        for chunk in stream:
            if chunk.type == "content_block_delta":
                content = chunk.delta.text
                print(f"{GREEN}{content}{RESET}", end="", flush=True)
                assistant_response += content
        
        print()  # New line after the complete response
        
        conversation.append({"role": "assistant", "content": assistant_response})

if __name__ == "__main__":
    chat_with_claude()

***
