# Google Bigtable


> [Bigtable](https://cloud.google.com/bigtable) is a key-value and wide-column store, ideal for fast access to structured, semi-structured, or unstructured data. Extend your database application to build AI-powered experiences leveraging Bigtable's Langchain integrations.


This notebook goes over how to use an LLM agent to query [Bigtable](https://cloud.google.com/bigtable) through the `BigtableExecuteQueryTools` suite, which includes `BigtableExecuteQueryTool` and `PresetBigtableExecuteQueryTool`.


[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/googleapis/langchain-google-bigtable-python/blob/main/docs/get_table_and_execute_query.ipynb)



## Before You Begin

To run this notebook, you will need to do the following:

* [Create a Google Cloud Project](https://developers.google.com/workspace/guides/create-project)
* [Enable the Bigtable API](https://console.cloud.google.com/flows/enableapi?apiid=bigtable.googleapis.com)
* [Create a Bigtable instance](https://cloud.google.com/bigtable/docs/creating-instance)
* [Create Bigtable access credentials](https://developers.google.com/workspace/guides/create-credentials)

After confirming access to the database in the runtime environment of this notebook, fill in the following values and run the cell before running example scripts.

### ☁ Set Your Google Cloud Project and Instance
`BigtableExecuteQueryTool` suite requires an existing project and instance.

In [None]:
# @markdown Specify a project and an instance for demo purposes.
PROJECT_ID = ""  # @param {type:"string"}
INSTANCE_ID = ""  # @param {type:"string"}

### 🦜🔗 Library Installation

Install the required libraries for Bigtable and LangChain integrations.

In [None]:
# Install required libraries
%pip install google-cloud-bigtable
%pip install --upgrade google-auth
%pip install langchain
%pip install -U langchain-google-genai
%pip install langgraph langchain-google-genai
%pip install -U langchain-google-vertexai

### 🔐 Authentication

Authenticate to Google Cloud as the IAM user logged into this notebook in order to access your Google Cloud Project.

In [None]:
!gcloud auth application-default login
!gcloud config set project {PROJECT_ID}

### 👍Initialize Bigtable Client and Create Tables

We will initialize:

1. **`admin_client`**: Used for administrative operations, such as creating and deleting tables.
2. **`engine`**: Used for all data operations and LangChain tools (reading, writing, querying, etc.).

We will also create two tables for query tests:

1. **`hotels` table**: For testing queries on hotel data.
2. **`fruits` table**: For testing queries on fruit data.

Both tables will be cleaned up after the tests.


In [None]:
# Import necessary libraries
from google.cloud import bigtable
from langchain_google_bigtable.engine import BigtableEngine   

# --- Initialize Bigtable admin client (for admin ops like create/delete table) ---
admin_client = bigtable.Client(project=PROJECT_ID, admin=True)
instance = admin_client.instance(INSTANCE_ID)
print(f"Connected to Bigtable instance: {instance.instance_id}")

# --- Initialize BigtableEngine for all data/tools operations (Asynchronous Setup) ---
import nest_asyncio
nest_asyncio.apply()
engine = await BigtableEngine.async_initialize(project_id=PROJECT_ID)


In [17]:
# Create a hotel table and insert data for demo purposes
instance = admin_client.instance(INSTANCE_ID)
hotels_table_id = "test-table-hotels"
hotels_table = instance.table(hotels_table_id)
column_families = {
    "cf": bigtable.column_family.MaxVersionsGCRule(1),
}

if not hotels_table.exists():
    hotels_table.create(column_families=column_families)

    # Define columns and data
    columns = ["id", "name", "location", "price_tier", "checkin_date", "checkout_date", "booked"]
    data = [
        [1, 'Hilton Basel', 'Basel', 'Luxury', '2024-04-20', '2024-04-22', False],
        [2, 'Marriott Zurich', 'Zurich', 'Upscale', '2024-04-14', '2024-04-21', False],
        [3, 'Hyatt Regency Basel', 'Basel', 'Upper Upscale', '2024-04-02', '2024-04-20', False],
        [4, 'Radisson Blu Lucerne', 'Lucerne', 'Midscale', '2024-04-05', '2024-04-24', False],
        [5, 'Best Western Bern', 'Bern', 'Upper Midscale', '2024-04-01', '2024-04-23', False],
        [6, 'InterContinental Geneva', 'Geneva', 'Luxury', '2024-04-23', '2024-04-28', False],
        [7, 'Sheraton Zurich', 'Zurich', 'Upper Upscale', '2024-04-02', '2024-04-27', False],
        [8, 'Holiday Inn Basel', 'Basel', 'Upper Midscale', '2024-04-09', '2024-04-24', False],
        [9, 'Courtyard Zurich', 'Zurich', 'Upscale', '2024-04-03', '2024-04-13', False],
        [10, 'Comfort Inn Bern', 'Bern', 'Midscale', '2024-04-04', '2024-04-16', False],
    ]

    # Insert data into the table
    mutations = []
    batcher = hotels_table.mutations_batcher(max_row_bytes=1024)
    for row in data:
        row_key = f"hotels#{row[0]}#{row[2]}#{row[1]}#{row[3]}"
        mutation = hotels_table.direct_row(row_key)
        for col, value in zip(columns, row):
            mutation.set_cell("cf", col.encode("utf-8"), str(value).encode("utf-8"))
        mutations.append(mutation)
    for mutation in mutations:
        batcher.mutate(mutation)

# Create a fruits table and insert data for demo purposes
fruits_table_id = "test-table-fruits"
fruits_table = instance.table(fruits_table_id)
column_families = {
    "cf": bigtable.column_family.MaxVersionsGCRule(1),
} 
if not fruits_table.exists():
        fruits_table.create(column_families=column_families)

        columns = ["id", "name", "color", "price_per_kg", "in_stock"]
        data = [
            [1, 'Apple', 'Red', 3.5, True],
            [2, 'Banana', 'Yellow', 1.2, True],
            [3, 'Grapes', 'Purple', 2.8, False],
            [4, 'Orange', 'Orange', 2.0, True],
            [5, 'Blueberry', 'Blue', 4.0, False],
            [6, 'Strawberry', 'Red', 3.8, True],
            [7, 'Pineapple', 'Brown', 2.5, True],
            [8, 'Mango', 'Yellow', 3.0, False],
            [9, 'Watermelon', 'Green', 1.5, True],
            [10, 'Kiwi', 'Brown', 4.2, False],
        ]

        mutations = []
        batcher = fruits_table.mutations_batcher(max_row_bytes=1024)
        for row in data:
            row_key = f"fruits#{row[0]}#{row[1]}#{row[2]}"
            mutation = fruits_table.direct_row(row_key)
            for col, value in zip(columns, row):
                mutation.set_cell("cf", col.encode("utf-8"), str(value).encode("utf-8"))
            mutations.append(mutation)
        for mutation in mutations:
            batcher.mutate(mutation)

## Usage: Two Approaches

This section demonstrates two ways to query Bigtable using LangChain tools.

---

### 1. Basic Usage: `PresetBigtableExecuteQueryTool`

Use **`PresetBigtableExecuteQueryTool`** when you already know the instance and the SQL query you want to run.  
You initialize the tool with `instance_id` and `query`, and then pass it into an agent.

We will initialize the following tool:

1. **`PresetBigtableExecuteQueryTool`** – executes a given SQL query against a Bigtable instance.  
   Requires both `instance_id` and `query` as input.


In [None]:
from langchain_google_bigtable.execute_query_tools import PresetBigtableExecuteQueryTool

preset_query_hotel_tool = PresetBigtableExecuteQueryTool(
    engine=engine,
    instance_id=INSTANCE_ID,
    query= "SELECT * FROM `test-table-hotels`",
    tool_name="preset_query_hotel_tool"
)

preset_query_fruits_tool = PresetBigtableExecuteQueryTool(
    engine=engine,
    instance_id=INSTANCE_ID,
    query="SELECT * FROM `test-table-fruits`",
    tool_name="preset_query_fruits_tool"
)
print("Tools initialized successfully.")


#### 1.2 Create an Agent and Ask a Question
We now create an agent using the preset query tool.  
The agent will execute the query and process the results to answer user questions.

Optional: If you want to use a Google-based agent, make sure `GOOGLE_API_KEY` is set as shown above.

In [7]:
import os
os.environ["GOOGLE_API_KEY"] = ""  # TODO: Set your Google API key here if you want to use Google LLMs

In [None]:
# Create a conversational agent
from langgraph.prebuilt import create_react_agent
from langchain_core.messages.ai import BaseMessage

agent = create_react_agent(
    model="gemini-2.5-pro",
    tools=[preset_query_hotel_tool, preset_query_fruits_tool],
    prompt="You are a helpful assistant. Make sure to process the data retrieved from the Bigtable execute query tool to answer user questions effectively."
)
# Ask the agent a question about the hotels table
response = agent.invoke({
    "messages": [
        {
            "role": "user",
            "content": (
                "Find me all luxury hotels and their prices. Use the tool to get the data."
            )
        }
    ]
})

# Print the final thought and agent return.
final_message: BaseMessage = response["messages"][-1]
print("Final Thought:")
final_message.pretty_print()
print("\n")

# Print each thought.
print("All Steps:\n")
for i, message in enumerate(response["messages"], start=1):
    message: BaseMessage
    print(f"Step {i}:")
    message.pretty_print()
    print("\n")

In [None]:
# Ask the agent a question about the fruits table
fruit_response = agent.invoke({
    "messages": [
        {
            "role": "user",
            "content": (
                "Find me cheap fruits. Use the tool to get the data."
            )
        }
    ]
})

# Print the final thought and agent return.
final_message: BaseMessage = fruit_response["messages"][-1]
print("Final Thought:")
final_message.pretty_print()
print("\n")

# Print each thought.
print("All Steps:\n")
for i, message in enumerate(fruit_response["messages"], start=1):
    message: BaseMessage
    print(f"Step {i}:")
    message.pretty_print()
    print("\n")

### 1.3 Parameterized Query Usage

You can run parameterized queries by passing a parameters dictionary to the **`PresetBigtableExecuteQueryTool`**.  
For example, to find all hotels in Basel:

- The query must use parameter placeholders (e.g. `@location`)
- The agent will generate the needed parameters. 

Example:
```python
parameterized_query = "SELECT * FROM `test-table-hotels` WHERE cf['location'] = @location"
```

In [None]:
# Define a parameterized query to find hotels in a specific location
parameterized_query = "SELECT * FROM `test-table-hotels` WHERE cf['location'] = @location"
preset_query_hotel_tool_with_param = PresetBigtableExecuteQueryTool(
    engine=engine,
    instance_id=INSTANCE_ID,
    query=parameterized_query,
    tool_name="preset_query_hotel_tool_with_param"
)


agent = create_react_agent(
    model="gemini-2.5-pro",
    tools=[preset_query_hotel_tool_with_param],
    prompt="You are a helpful assistant. Make sure to process the data retrieved from the Bigtable execute query tool to answer user questions effectively."
)

hotel_parameterized_response = agent.invoke({
    "messages": [
        {
            "role": "user",
            "content": (
                "Find me hotels in Basel."
            )
        }
    ]
})

# Print the final thought and agent return.
final_message: BaseMessage = hotel_parameterized_response["messages"][-1]
print("Final Thought:")
final_message.pretty_print()
print("\n")

# Print each thought.
print("All Steps:\n")
for i, message in enumerate(hotel_parameterized_response["messages"], start=1):
    message: BaseMessage
    print(f"Step {i}:")
    message.pretty_print()
    print("\n")



### 2. Advanced Usage: With Schema Context

In this section, we demonstrate how to define and use a **custom tool** to retrieve the Bigtable schema.  
This allows the agent to automatically generate queries based on schema information, without requiring the user to manually provide queries or instance parameters.

---

#### 2.1 Define a Schema Tool
We implement `BigtableGetInstancesAndTableSchemaTool` to retrieve metadata about Bigtable resources:  
- Instances  
- Tables within each instance  
- Column families and potential columns (qualifiers)  

Since Bigtable has no fixed column qualifier schema, this tool scans a small number of rows to infer possible column qualifiers.

In [12]:
from google.cloud.bigtable import Client
from google.cloud.bigtable.instance import Instance
from langchain_core.tools import BaseTool
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from typing import List
from google.cloud.bigtable.row_data import PartialRowsData

# The number of rows to scan to find possible column qualifiers.
DEFAULT_COL_QUALIFIER_SCAN_LIMIT = 1  

@dataclass
class FamilyQualifiers:
    column_family_name: str
    column_qualifiers: List[str] = field(default_factory=list)

def extract_family_qualifiers(rows: PartialRowsData) -> List[FamilyQualifiers]:
    """
    Extracts a list of FamilyQualifiers, each containing a family name and a sorted list of qualifiers.
    Raises ValueError if no valid data found.
    """
    if not hasattr(rows, "rows") or not rows.rows:
        raise ValueError("No rows data found for extracting family qualifiers.")
    family_map = {}
    for row in rows.rows.values():
        for family_name, qualifier_cells in row.cells.items():
            if family_name not in family_map:
                family_map[family_name] = set()
            for qualifier in qualifier_cells.keys():
                family_map[family_name].add(qualifier.decode('utf-8'))
    if not family_map:
        raise ValueError("No column families or qualifiers found in the scanned rows.")
    return [FamilyQualifiers(family, sorted(list(qualifiers))) for family, qualifiers in family_map.items()]


def format_family_qualifiers(family_qualifiers: List[FamilyQualifiers]) -> List[str]:
    """
    Formats the list of FamilyQualifiers into a list like: ["cf1['col1']", "cf1['col2']", "cf2['col1']"]
    """
    qualified_columns = []
    for fq in family_qualifiers:
        for qualifier in fq.column_qualifiers:
            qualified_columns.append(f"{fq.column_family_name}['{qualifier}']")
    return qualified_columns

class BigtableGetInstancesAndTableSchemaTool(BaseTool):
    """
    A tool to interact with Google Bigtable and retrieve metadata and data of a project.
    """

    name: str = "GetBigtableInstancesAndTableSchema"
    description: str = (
        "Gets the schema of all Bigtable resources in a project: all instances, their tables, "
        "and for each table the column families and their column qualifiers. "
        "Bigtable uses a two-tier column model: Column Families -> Column Qualifiers "
        "Since Bigtable has no fixed schema, the tool scans a few rows to infer possible column qualifiers. "
    )
    _client: Client

    def __init__(
        self,
        client: Client,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self._client = client

    def get_instances(self):
        """
        Retrieve all instances in the project (synchronous).
        """
        instances: List[Instance]
        instances, _ = self._client.list_instances()
        return [instance.instance_id for instance in instances]

    def get_tables(self, instance_id: str):
        """
        Retrieve all tables in a specific instance (synchronous).
        """
        instance = self._client.instance(instance_id)
        tables = instance.list_tables()
        return [table.table_id for table in tables]

    def get_column_families(self, instance_id: str, table_id: str):
        """
        Retrieve all column families in a specific table (synchronous).
        """
        instance = self._client.instance(instance_id)
        table = instance.table(table_id)
        column_families = table.list_column_families()
        return list(column_families.keys())
    
    def get_possible_columns(
        self,
        instance_id: str,
        table_id: str,
        row_limit: Optional[int] = DEFAULT_COL_QUALIFIER_SCAN_LIMIT,
    ) -> List[str]:
        """
        Retrieve possible columns by scanning the first (possibly few) rows of a table.
        """
        try:
            instance = self._client.instance(instance_id)
            table = instance.table(table_id)
            rows = table.read_rows(limit=row_limit)
            rows.consume_all()
            family_qualifiers = extract_family_qualifiers(rows)
            return format_family_qualifiers(family_qualifiers)
        except Exception as e:
            return f"Error extracting possible columns: {str(e)}"


    def get_metadata(self) -> Dict[str, Dict[str, Dict[str, List[str]]]]:
        """
        Retrieve all instances, tables, and column families (synchronous).
        """
        metadata = {}
        instances = self.get_instances()

        for instance_id in instances:
            metadata[instance_id] = {}
            tables = self.get_tables(instance_id)
            for table_id in tables:
                """
                We explicitly retrieve column families here because scanning rows may miss
                families that have no data in the sampled rows. Only the metadata API can
                guarantee a complete list of all defined column families, even for empty tables.
                """
                column_families = self.get_column_families(instance_id, table_id)
                possible_columns = self.get_possible_columns(instance_id, table_id)
                metadata[instance_id][table_id] = {
                    "column_families": column_families,
                    "possible_columns": possible_columns,
                }
        return metadata

    def _run(self, *args, **kwargs):
        """
        Implementation of the abstract method `_run` (synchronous).
        """
        return self.get_metadata()


#### 2.2 Initialize Tools and Create an Agent with Tools
We initialize two tools:  
1. **`BigtableGetInstancesAndTableSchemaTool`** – retrieves Bigtable schema information  
2. **`BigtableExecuteQueryTool`** – runs queries against Bigtable

We also create a conversational agent with both tools so it can:  
- fetch schema  
- generate and execute queries

In [None]:
from langchain_google_bigtable.execute_query_tools import BigtableExecuteQueryTool
from langgraph.prebuilt import create_react_agent

# --- Initialize LangChain tools with engine ---
get_instances_and_table_schema_tool = BigtableGetInstancesAndTableSchemaTool(client=admin_client)
execute_query_tool = BigtableExecuteQueryTool(engine=engine)

# --- Create a conversational agent --- 
agent = create_react_agent(
    model="gemini-2.5-pro",
    tools=[get_instances_and_table_schema_tool, execute_query_tool],
    prompt="You are a helpful assistant. Make sure to process the data retrieved from the Bigtable execute query tool to answer user questions effectively."
)
print("Tools and Agent initialized successfully.")


#### 2.3 Example Query
We now ask the agent: *"Which hotels are luxurious?"*  
The agent automatically uses the schema tool to infer the appropriate table and column names, selects the correct table, and then constructs and executes a query to answer the question.

In [None]:
# Ask the agent a question about the hotels table
response = agent.invoke({
    "messages": [
        {
            "role": "user",
            "content": (
                "I want to figure out what hotels are luxurious. Don't ask me for any other information. Use the tools at your disposal to get the information you need."
            )
        }
    ]
})

# Print the final thought and agent return.
final_message: BaseMessage = response["messages"][-1]
print("Final Thought:")
final_message.pretty_print()
print("\n")

# Print each thought.
print("All Steps:\n")
for i, message in enumerate(response["messages"], start=1):
    message: BaseMessage
    print(f"Step {i}:")
    message.pretty_print()
    print("\n")
# Print the agent's response to debug - the result is verbose and contains all the steps the agent took to arrive at the final answer
# print(response)

### Cleanup

After the tests, delete the `hotels` and `fruits` tables to clean up resources.

In [None]:
# Cleanup tables
def delete_table(table):
    if table.exists():
        table.delete()
        print(f"Table '{table.table_id}' deleted successfully.")
    else:
        print(f"Table '{table.table_id}' does not exist.")

delete_table(hotels_table)
delete_table(fruits_table)