In [7]:
import os
from dotenv import load_dotenv

# Load BEFORE any ADK imports
load_dotenv()

# Verify the key is loaded
print("API Key loaded:", "GOOGLE_API_KEY" in os.environ)

API Key loaded: True


In [9]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.adk import Agent
from google.adk.tools.tool_context import ToolContext
from google.genai import types


async def update_state(tool_context: ToolContext, key: str, value: str) -> dict:
  """Updates a state value."""
  tool_context.state[key] = value
  return {"status": f"Updated state '{key}' to '{value}'"}


async def load_state(tool_context: ToolContext, key: str) -> dict:
  """Loads a state value."""
  return {key: tool_context.state.get(key)}


async def save_artifact(
    tool_context: ToolContext, filename: str, content: str
) -> dict:
  """Saves an artifact with the given filename and content."""
  artifact_bytes = content.encode("utf-8")
  artifact_part = types.Part(
      inline_data=types.Blob(mime_type="text/plain", data=artifact_bytes)
  )
  version = await tool_context.save_artifact(filename, artifact_part)
  return {"status": "success", "filename": filename, "version": version}


async def load_artifact(tool_context: ToolContext, filename: str) -> dict:
  """Loads an artifact with the given filename."""
  artifact = await tool_context.load_artifact(filename)
  if not artifact:
    return {"error": f"Artifact '{filename}' not found"}
  content = artifact.inline_data.data.decode("utf-8")
  return {"filename": filename, "content": content}


# Create the agent
root_agent = Agent(
    name="state_agent",
    model="gemini-2.0-flash",
    instruction="""You are an agent that manages state and artifacts.

    You can:
    - Update state value
    - Load state value
    - Save artifact
    - Load artifact

    Use the appropriate tool based on what the user asks for.""",
    tools=[
        update_state,
        load_state,
        save_artifact,
        load_artifact,
    ],
)

In [3]:
#!/usr/bin/env python3
"""Simple test script for Rewind Session agent."""

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging

from google.adk.agents.run_config import RunConfig
from google.adk.cli.utils import logs
from google.adk.events.event import Event
from google.adk.runners import InMemoryRunner
from google.genai import types

APP_NAME = "rewind_test_app"
USER_ID = "test_user"
  

In [4]:

logs.setup_adk_logger(level=logging.ERROR)
logging.getLogger("google_genai.types").setLevel(logging.ERROR)


# ANSI color codes for terminal output
COLOR_RED = "\x1b[31m"
COLOR_BLUE = "\x1b[34m"
COLOR_YELLOW = "\x1b[33m"
COLOR_BOLD = "\x1b[1m"
RESET = "\x1b[0m"


def highlight(text: str) -> str:
  """Adds color highlights to tool responses and agent text."""
  text = str(text)
  return (
      text.replace("'red'", f"'{COLOR_RED}red{RESET}'")
      .replace('"red"', f'"{COLOR_RED}red{RESET}"')
      .replace("'blue'", f"'{COLOR_BLUE}blue{RESET}'")
      .replace('"blue"', f'"{COLOR_BLUE}blue{RESET}"')
      .replace("'version1'", f"'{COLOR_BOLD}{COLOR_YELLOW}version1{RESET}'")
      .replace("'version2'", f"'{COLOR_BOLD}{COLOR_YELLOW}version2{RESET}'")
  )


async def call_agent_async(
    runner: InMemoryRunner, user_id: str, session_id: str, prompt: str
) -> list[Event]:
  """Helper function to call the agent and return events."""
  print(f"\nüë§ User: {prompt}")
  content = types.Content(
      role="user", parts=[types.Part.from_text(text=prompt)]
  )
  events = []
  try:
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id,
        new_message=content,
        run_config=RunConfig(),
    ):
      events.append(event)
      if event.content and event.author and event.author != "user":
        for part in event.content.parts:
          if part.text:
            print(f"  ü§ñ Agent: {highlight(part.text)}")
          elif part.function_call:
            print(f"    üõ†Ô∏è Tool Call: {part.function_call.name}")
          elif part.function_response:
            print(
                "    üì¶ Tool Response:"
                f" {highlight(part.function_response.response)}"
            )
  except Exception as e:
    print(f"‚ùå Error during agent call: {e}")
    raise
  return events


"""Demonstrates session rewind."""
print("üöÄ Testing Rewind Session Feature")
print("=" * 50)

runner = InMemoryRunner(
    agent=root_agent,
    app_name=APP_NAME,
)

# Create a session
session = await runner.session_service.create_session(
    app_name=APP_NAME, user_id=USER_ID
)
print(f"Created session: {session.id}")



üöÄ Testing Rewind Session Feature
Created session: 562e85c4-e446-4be3-8f79-d6fbbd09d530


In [8]:

# 1. Initial agent calls to set state and artifact
print("\n\n===== INITIALIZING STATE AND ARTIFACT =====")
await call_agent_async(
    runner, USER_ID, session.id, "set state `color` to red"
)
await call_agent_async(
    runner, USER_ID, session.id, "save artifact file1 with content version1"
)




===== INITIALIZING STATE AND ARTIFACT =====

üë§ User: set state `color` to red
    üõ†Ô∏è Tool Call: update_state
    üì¶ Tool Response: {'status': "Updated state 'color' to '[31mred[0m'"}
  ü§ñ Agent: OK. I have updated the state `color` to `red`.


üë§ User: save artifact file1 with content version1
    üõ†Ô∏è Tool Call: save_artifact
    üì¶ Tool Response: {'status': 'success', 'filename': 'file1', 'version': 0}
  ü§ñ Agent: OK. I have saved an artifact named `file1` with content `version1`.



[Event(model_version='gemini-2.0-flash', content=Content(
   parts=[
     Part(
       function_call=FunctionCall(
         args={
           'content': 'version1',
           'filename': 'file1'
         },
         id='adk-bbe259e4-922d-4118-8731-99855673b4d0',
         name='save_artifact'
       )
     ),
   ],
   role='model'
 ), grounding_metadata=None, partial=None, turn_complete=None, finish_reason=<FinishReason.STOP: 'STOP'>, error_code=None, error_message=None, interrupted=None, custom_metadata=None, usage_metadata=GenerateContentResponseUsageMetadata(
   candidates_token_count=9,
   candidates_tokens_details=[
     ModalityTokenCount(
       modality=<MediaModality.TEXT: 'TEXT'>,
       token_count=9
     ),
   ],
   prompt_token_count=179,
   prompt_tokens_details=[
     ModalityTokenCount(
       modality=<MediaModality.TEXT: 'TEXT'>,
       token_count=179
     ),
   ],
   total_token_count=188
 ), live_session_resumption_update=None, input_transcription=None, output_tran

In [10]:

# 2. Check current state and artifact
print("\n\n===== STATE BEFORE UPDATE =====")
await call_agent_async(
    runner, USER_ID, session.id, "what is the value of state `color`?"
)
await call_agent_async(runner, USER_ID, session.id, "load artifact file1")




===== STATE BEFORE UPDATE =====

üë§ User: what is the value of state `color`?
    üõ†Ô∏è Tool Call: load_state
    üì¶ Tool Response: {'color': '[31mred[0m'}
  ü§ñ Agent: The value of state `color` is `red`.


üë§ User: load artifact file1
    üõ†Ô∏è Tool Call: load_artifact
    üì¶ Tool Response: {'filename': 'file1', 'content': '[1m[33mversion1[0m'}
  ü§ñ Agent: The content of artifact `file1` is `version1`.



[Event(model_version='gemini-2.0-flash', content=Content(
   parts=[
     Part(
       function_call=FunctionCall(
         args={
           'filename': 'file1'
         },
         id='adk-7dcf7e47-cb99-45f8-819e-4b7440489cda',
         name='load_artifact'
       )
     ),
   ],
   role='model'
 ), grounding_metadata=None, partial=None, turn_complete=None, finish_reason=<FinishReason.STOP: 'STOP'>, error_code=None, error_message=None, interrupted=None, custom_metadata=None, usage_metadata=GenerateContentResponseUsageMetadata(
   candidates_token_count=6,
   candidates_tokens_details=[
     ModalityTokenCount(
       modality=<MediaModality.TEXT: 'TEXT'>,
       token_count=6
     ),
   ],
   prompt_token_count=253,
   prompt_tokens_details=[
     ModalityTokenCount(
       modality=<MediaModality.TEXT: 'TEXT'>,
       token_count=253
     ),
   ],
   total_token_count=259
 ), live_session_resumption_update=None, input_transcription=None, output_transcription=None, avg_logprobs=2.969

In [None]:

# 3. Update state and artifact - THIS IS THE POINT WE WILL REWIND BEFORE
print("\n\n===== UPDATING STATE AND ARTIFACT =====")
events_update_state = await call_agent_async(
    runner, USER_ID, session.id, "update state key color to blue"
)
rewind_invocation_id = events_update_state[0].invocation_id
print(f"Will rewind before invocation: {rewind_invocation_id}")

await call_agent_async(
    runner, USER_ID, session.id, "save artifact file1 with content version2"
)


In [None]:

# 4. Check state and artifact after update
print("\n\n===== STATE AFTER UPDATE =====")
await call_agent_async(
    runner, USER_ID, session.id, "what is the value of state key color?"
)
await call_agent_async(runner, USER_ID, session.id, "load artifact file1")


In [None]:

# 5. Perform rewind
print(f"\n\n===== REWINDING SESSION to before {rewind_invocation_id} =====")
await runner.rewind_async(
    user_id=USER_ID,
    session_id=session.id,
    rewind_before_invocation_id=rewind_invocation_id,
)
print("‚úÖ Rewind complete.")


In [None]:

# 6. Check state and artifact after rewind
print("\n\n===== STATE AFTER REWIND =====")
await call_agent_async(
    runner, USER_ID, session.id, "what is the value of state `color`?"
)
await call_agent_async(runner, USER_ID, session.id, "load artifact file1")

print("\n" + "=" * 50)
print("‚ú® Rewind testing complete!")
print(
    "üîß If rewind was successful, color should be 'red' and file1 content"
    " should contain 'version1' in the final check."
)