## Setup

For development testing, you can setup a local Postgres Vector container like this:
```bash
# Pull a postgres image with pgvector installed
podman pull pgvector/pgvector:pg17

# Start the postgres container
podman run --name postgres -it --rm -e POSTGRES_USER=postgres -e POSTGRES_HOST_AUTH_METHOD=trust -p 5432:5432 pgvector/pgvector:pg17

# In a new terminal tab, create the necessary databases
createdb -h localhost -p 5432 -U postgres store
createdb -h localhost -p 5432 -U postgres checkpoint
```

In [15]:
import asyncio
import itertools

import psycopg
import psycopg_pool
from psycopg import rows

from google import genai
from google.genai import types

# from langgraph.store.memory import InMemoryStore
from langgraph.store.postgres import AsyncPostgresStore

In [16]:
PROJECT_ID = "genai-experience-concierge"
LOCATION = "us-central1"
EMBEDDING_MODEL_NAME = "text-embedding-005"

PG_CONNECTION_URL = "postgresql://postgres@localhost:5432/store"

genai_client = genai.Client(
    vertexai=True,
    project=PROJECT_ID,
    location=LOCATION,
)

store_pool = psycopg_pool.AsyncConnectionPool(
    conninfo=PG_CONNECTION_URL,
    connection_class=psycopg.AsyncConnection[rows.DictRow],
    kwargs={"autocommit": True, "row_factory": rows.dict_row},
    open=False,
)
await store_pool.open()

In [17]:
class VertexAIEmbeddings:
    def __init__(
        self,
        model_name: str,
        client: genai.Client,
        batch_size: int = 250,
        title: str | None = None,
        auto_truncate: bool = False,
        task_type: str | None = None,
        mime_type: str | None = None,
        output_dimensionality: int = 768,
    ):
        super().__init__()
        self.client = client
        self.output_dimensionality = output_dimensionality
        self.batch_size = min(batch_size, 250)
        self.title = title
        self.auto_truncate = auto_truncate
        self.task_type = task_type
        self.model_name = model_name
        self.mime_type = mime_type

    async def embed(self, documents: list[str]) -> list[list[float]]:
        batch_embedding_futures = [
            self.client.aio.models.embed_content(
                model=self.model_name,
                contents=query_batch,
                config=types.EmbedContentConfig(
                    task_type=self.task_type,
                    title=self.title,
                    output_dimensionality=self.output_dimensionality,
                    mime_type=self.mime_type,
                    auto_truncate=self.auto_truncate,
                ),
            )
            for query_batch in itertools.batched(documents, n=self.batch_size)
        ]

        embedding_batches = await asyncio.gather(*batch_embedding_futures)

        embeddings = [
            embedding.values
            for batch in embedding_batches
            for embedding in batch.embeddings or []
            if embedding.values is not None
        ]

        if len(embeddings) != len(documents):
            raise ValueError(
                "Count of valid embeddings doesn't match the number of input documents."
                f"Expected: {len(documents)}. Received: {len(embeddings)}"
            )

        return embeddings

In [19]:
embedding_model = VertexAIEmbeddings(
    model_name=EMBEDDING_MODEL_NAME,
    client=genai_client,
    task_type="RETRIEVAL_QUERY",
)

pg_store = AsyncPostgresStore(
    conn=store_pool,
    index={
        "embed": embedding_model.embed,
        "dims": embedding_model.output_dimensionality,
        "fields": ["text"],
    },
    ttl={
        "default_ttl": 60,
        "refresh_on_read": False,
        "sweep_interval_minutes": 1,
    },
)

await pg_store.setup()
await pg_store.start_ttl_sweeper()

{'v': 3}
{'v': 2}


<Task pending name='ttl_sweeper' coro=<AsyncPostgresStore.start_ttl_sweeper.<locals>._sweep_loop() running at /Users/pablogaeta/work/generative-ai/gemini/agents/genai-experience-concierge/langgraph-demo/backend/.venv/lib/python3.12/site-packages/langgraph/store/postgres/aio.py:313>>

In [20]:
await pg_store.aput(("docs",), "doc1", {"text": "Python tutorial"}, ttl=1)
await pg_store.aput(("docs",), "doc2", {"text": "TypeScript guide"}, ttl=1)

# Search by similarity
await pg_store.asearch(("docs",), query="python programming")

[Item(namespace=['docs'], key='doc1', value={'text': 'Python tutorial'}, created_at='2025-04-12T00:06:32.607700+00:00', updated_at='2025-04-12T00:06:32.607700+00:00', score=0.657091599606646),
 Item(namespace=['docs'], key='doc2', value={'text': 'TypeScript guide'}, created_at='2025-04-12T00:06:32.837157+00:00', updated_at='2025-04-12T00:06:32.837157+00:00', score=0.424146806489476)]

In [22]:
await pg_store.asearch(("docs",), query="python programming")

[]

In [23]:
await store_pool.close()