Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async mode for pgvector #32

Closed
wants to merge 17 commits into from
Closed

Conversation

pprados
Copy link
Contributor

@pprados pprados commented Apr 22, 2024

This PR adds the async approach for pgvector.

Some remarks:

  • We use assert to check invocations and not if. Thus, in production, it is possible to
    remove these checks with python -O ...
  • We propose a public session_maker attribute. This is very important for resilient
    scenarios.

In a RAG architecture, it is necessary to import document chunks.

To keep track of the links between chunks and documents, we can use the
index() API.
This API proposes to use an SQL-type record manager.

In a classic use case, using SQLRecordManager and a vector database, it is impossible
to guarantee the consistency of the import.

Indeed, if a crash occurs during the import, there is an inconsistency between the SQL
database and the vector database.

PGVector is the solution to this problem.

Indeed, it is possible to use a single database (and not a two-phase commit with 2
technologies, if they are both compatible). But, for this, it is necessary to be able
to combine the transactions between the use of SQLRecordManager and PGVector as a
vector database.

This is only possible if it is possible to intervene on the session_maker.

This is why we propose to make this attribute public. By unifying the session_maker
of SQLRecordManager and PGVector, it is possible to guarantee that all processes will
be executed in a single transaction.

This is, moreover, the only solution we know of to guarantee the consistency of the
import of chunks into a vector database. It's possible only if the outer session is built
with the connection.

def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)

The same thing is possible asynchronously, but a bug in sql_record_manager.py
in _amake_session() must first be fixed (See PR ).

    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session

Then, it is possible to do the same thing asynchronously:

async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())

@eyurtsev eyurtsev self-assigned this Apr 29, 2024
@pprados
Copy link
Contributor Author

pprados commented Apr 30, 2024 via email

@pprados
Copy link
Contributor Author

pprados commented May 6, 2024

@eyurtsev
May be, you can consider another may other PR 20735 ?

eyurtsev added a commit to langchain-ai/langchain that referenced this pull request May 8, 2024
…#20735)

The `_amake_session()` method does not allow modifying the
`self.session_factory` with
anything other than `async_sessionmaker`. This prohibits advanced uses
of `index()`.

In a RAG architecture, it is necessary to import document chunks.
To keep track of the links between chunks and documents, we can use the
`index()` API.
This API proposes to use an SQL-type record manager.

In a classic use case, using `SQLRecordManager` and a vector database,
it is impossible
to guarantee the consistency of the import. Indeed, if a crash occurs
during the import
(problem with the network, ...)
there is an inconsistency between the SQL database and the vector
database.

With the
[PR](langchain-ai/langchain-postgres#32) we are
proposing for `langchain-postgres`,
it is now possible to guarantee the consistency of the import of chunks
into
a vector database.  It's possible only if the outer session is built
with the connection.

```python
def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)
```
The same thing is possible asynchronously, but a bug in
`sql_record_manager.py`
in `_amake_session()` must first be fixed.

```python
    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session
``` 

Then, it is possible to do the same thing asynchronously:

```python
async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())
```

---------

Signed-off-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
Co-authored-by: Sean <sean@upstage.ai>
Co-authored-by: JuHyung-Son <sonju0427@gmail.com>
Co-authored-by: Erick Friis <erick@langchain.dev>
Co-authored-by: YISH <mokeyish@hotmail.com>
Co-authored-by: Bagatur <baskaryan@gmail.com>
Co-authored-by: Jason_Chen <820542443@qq.com>
Co-authored-by: Joan Fontanals <joan.fontanals.martinez@jina.ai>
Co-authored-by: Pavlo Paliychuk <pavlo.paliychuk.ca@gmail.com>
Co-authored-by: fzowl <160063452+fzowl@users.noreply.github.com>
Co-authored-by: samanhappy <samanhappy@gmail.com>
Co-authored-by: Lei Zhang <zhanglei@apache.org>
Co-authored-by: Tomaz Bratanic <bratanic.tomaz@gmail.com>
Co-authored-by: merdan <48309329+merdan-9@users.noreply.github.com>
Co-authored-by: ccurme <chester.curme@gmail.com>
Co-authored-by: Andres Algaba <andresalgaba@gmail.com>
Co-authored-by: davidefantiniIntel <115252273+davidefantiniIntel@users.noreply.github.com>
Co-authored-by: Jingpan Xiong <71321890+klaus-xiong@users.noreply.github.com>
Co-authored-by: kaka <kaka@zbyte-inc.cloud>
Co-authored-by: jingsi <jingsi@leadincloud.com>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
Co-authored-by: Rahul Triptahi <rahul.psit.ec@gmail.com>
Co-authored-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Shengsheng Huang <shannie.huang@gmail.com>
Co-authored-by: Michael Schock <mjschock@users.noreply.github.com>
Co-authored-by: Anish Chakraborty <anish749@users.noreply.github.com>
Co-authored-by: am-kinetica <85610855+am-kinetica@users.noreply.github.com>
Co-authored-by: Dristy Srivastava <58721149+dristysrivastava@users.noreply.github.com>
Co-authored-by: Matt <matthew.gotteiner@microsoft.com>
Co-authored-by: William FH <13333726+hinthornw@users.noreply.github.com>
kyle-cassidy pushed a commit to kyle-cassidy/langchain that referenced this pull request May 10, 2024
…langchain-ai#20735)

The `_amake_session()` method does not allow modifying the
`self.session_factory` with
anything other than `async_sessionmaker`. This prohibits advanced uses
of `index()`.

In a RAG architecture, it is necessary to import document chunks.
To keep track of the links between chunks and documents, we can use the
`index()` API.
This API proposes to use an SQL-type record manager.

In a classic use case, using `SQLRecordManager` and a vector database,
it is impossible
to guarantee the consistency of the import. Indeed, if a crash occurs
during the import
(problem with the network, ...)
there is an inconsistency between the SQL database and the vector
database.

With the
[PR](langchain-ai/langchain-postgres#32) we are
proposing for `langchain-postgres`,
it is now possible to guarantee the consistency of the import of chunks
into
a vector database.  It's possible only if the outer session is built
with the connection.

```python
def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)
```
The same thing is possible asynchronously, but a bug in
`sql_record_manager.py`
in `_amake_session()` must first be fixed.

```python
    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session
``` 

Then, it is possible to do the same thing asynchronously:

```python
async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())
```

---------

Signed-off-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
Co-authored-by: Sean <sean@upstage.ai>
Co-authored-by: JuHyung-Son <sonju0427@gmail.com>
Co-authored-by: Erick Friis <erick@langchain.dev>
Co-authored-by: YISH <mokeyish@hotmail.com>
Co-authored-by: Bagatur <baskaryan@gmail.com>
Co-authored-by: Jason_Chen <820542443@qq.com>
Co-authored-by: Joan Fontanals <joan.fontanals.martinez@jina.ai>
Co-authored-by: Pavlo Paliychuk <pavlo.paliychuk.ca@gmail.com>
Co-authored-by: fzowl <160063452+fzowl@users.noreply.github.com>
Co-authored-by: samanhappy <samanhappy@gmail.com>
Co-authored-by: Lei Zhang <zhanglei@apache.org>
Co-authored-by: Tomaz Bratanic <bratanic.tomaz@gmail.com>
Co-authored-by: merdan <48309329+merdan-9@users.noreply.github.com>
Co-authored-by: ccurme <chester.curme@gmail.com>
Co-authored-by: Andres Algaba <andresalgaba@gmail.com>
Co-authored-by: davidefantiniIntel <115252273+davidefantiniIntel@users.noreply.github.com>
Co-authored-by: Jingpan Xiong <71321890+klaus-xiong@users.noreply.github.com>
Co-authored-by: kaka <kaka@zbyte-inc.cloud>
Co-authored-by: jingsi <jingsi@leadincloud.com>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
Co-authored-by: Rahul Triptahi <rahul.psit.ec@gmail.com>
Co-authored-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Shengsheng Huang <shannie.huang@gmail.com>
Co-authored-by: Michael Schock <mjschock@users.noreply.github.com>
Co-authored-by: Anish Chakraborty <anish749@users.noreply.github.com>
Co-authored-by: am-kinetica <85610855+am-kinetica@users.noreply.github.com>
Co-authored-by: Dristy Srivastava <58721149+dristysrivastava@users.noreply.github.com>
Co-authored-by: Matt <matthew.gotteiner@microsoft.com>
Co-authored-by: William FH <13333726+hinthornw@users.noreply.github.com>
@pprados
Copy link
Contributor Author

pprados commented May 13, 2024

@eyurtsev

The promise of the constructor, with the create_extension parameter, is to guarantee that the extension is added before the APIs are used. Since this promise cannot be kept in an async scenario, there is an alternative:

  • Remove this parameter, since the promise cannot be kept. Otherwise, an async method is needed to install the extension before the APIs are used, and to check that this method has been invoked at the start of each API.
  • Use a lazy approach as suggested, which simply respects the constructor's promise.

Can you launch the workflow?

kyle-cassidy pushed a commit to kyle-cassidy/langchain that referenced this pull request May 16, 2024
…langchain-ai#20735)

The `_amake_session()` method does not allow modifying the
`self.session_factory` with
anything other than `async_sessionmaker`. This prohibits advanced uses
of `index()`.

In a RAG architecture, it is necessary to import document chunks.
To keep track of the links between chunks and documents, we can use the
`index()` API.
This API proposes to use an SQL-type record manager.

In a classic use case, using `SQLRecordManager` and a vector database,
it is impossible
to guarantee the consistency of the import. Indeed, if a crash occurs
during the import
(problem with the network, ...)
there is an inconsistency between the SQL database and the vector
database.

With the
[PR](langchain-ai/langchain-postgres#32) we are
proposing for `langchain-postgres`,
it is now possible to guarantee the consistency of the import of chunks
into
a vector database.  It's possible only if the outer session is built
with the connection.

```python
def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)
```
The same thing is possible asynchronously, but a bug in
`sql_record_manager.py`
in `_amake_session()` must first be fixed.

```python
    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session
``` 

Then, it is possible to do the same thing asynchronously:

```python
async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())
```

---------

Signed-off-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
Co-authored-by: Sean <sean@upstage.ai>
Co-authored-by: JuHyung-Son <sonju0427@gmail.com>
Co-authored-by: Erick Friis <erick@langchain.dev>
Co-authored-by: YISH <mokeyish@hotmail.com>
Co-authored-by: Bagatur <baskaryan@gmail.com>
Co-authored-by: Jason_Chen <820542443@qq.com>
Co-authored-by: Joan Fontanals <joan.fontanals.martinez@jina.ai>
Co-authored-by: Pavlo Paliychuk <pavlo.paliychuk.ca@gmail.com>
Co-authored-by: fzowl <160063452+fzowl@users.noreply.github.com>
Co-authored-by: samanhappy <samanhappy@gmail.com>
Co-authored-by: Lei Zhang <zhanglei@apache.org>
Co-authored-by: Tomaz Bratanic <bratanic.tomaz@gmail.com>
Co-authored-by: merdan <48309329+merdan-9@users.noreply.github.com>
Co-authored-by: ccurme <chester.curme@gmail.com>
Co-authored-by: Andres Algaba <andresalgaba@gmail.com>
Co-authored-by: davidefantiniIntel <115252273+davidefantiniIntel@users.noreply.github.com>
Co-authored-by: Jingpan Xiong <71321890+klaus-xiong@users.noreply.github.com>
Co-authored-by: kaka <kaka@zbyte-inc.cloud>
Co-authored-by: jingsi <jingsi@leadincloud.com>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
Co-authored-by: Rahul Triptahi <rahul.psit.ec@gmail.com>
Co-authored-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Shengsheng Huang <shannie.huang@gmail.com>
Co-authored-by: Michael Schock <mjschock@users.noreply.github.com>
Co-authored-by: Anish Chakraborty <anish749@users.noreply.github.com>
Co-authored-by: am-kinetica <85610855+am-kinetica@users.noreply.github.com>
Co-authored-by: Dristy Srivastava <58721149+dristysrivastava@users.noreply.github.com>
Co-authored-by: Matt <matthew.gotteiner@microsoft.com>
Co-authored-by: William FH <13333726+hinthornw@users.noreply.github.com>
@pprados
Copy link
Contributor Author

pprados commented May 21, 2024

@eyurtsev, can you approval the workflow? I can check if all the code passes the CI.

@pprados
Copy link
Contributor Author

pprados commented May 27, 2024

Hello @eyurtsev

I've aligned the code with your similar requests for SQLChatMessageHistory.

Currently, SQLChatMessageHistory cannot be reviewed, as there are bugs in the base sources, when linting around docs/scripts/arxiv_references.py or other *.ipynb. I'm waiting for the master sources to be updated.

Can you review this code which allows me to set the resilience with langchain definitively?

@ssifood
Copy link

ssifood commented May 27, 2024

I am waitting this PR!!

aysnc langchain_postgres

Copy link
Contributor Author

@pprados pprados left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All is aligned with SQL Docstore

@pprados
Copy link
Contributor Author

pprados commented May 27, 2024

@eyurtsev
I locked with "1 change requested", but I can't find where the change request is. Can you help me?

Copy link
Contributor Author

@pprados pprados left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All it's fixed.

Copy link
Contributor Author

@pprados pprados left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

eyurtsev added a commit to langchain-ai/langchain that referenced this pull request Jun 5, 2024
…22065)

# package community: Fix SQLChatMessageHistory

## Description
Here is a rewrite of `SQLChatMessageHistory` to properly implement the
asynchronous approach. The code circumvents [issue
22021](#22021) by
accepting a synchronous call to `def add_messages()` in an asynchronous
scenario. This bypasses the bug.

For the same reasons as in [PR
22](langchain-ai/langchain-postgres#32) of
`langchain-postgres`, we use a lazy strategy for table creation. Indeed,
the promise of the constructor cannot be fulfilled without this. It is
not possible to invoke a synchronous call in a constructor. We
compensate for this by waiting for the next asynchronous method call to
create the table.

The goal of the `PostgresChatMessageHistory` class (in
`langchain-postgres`) is, among other things, to be able to recycle
database connections. The implementation of the class is problematic, as
we have demonstrated in [issue
22021](#22021).

Our new implementation of `SQLChatMessageHistory` achieves this by using
a singleton of type (`Async`)`Engine` for the database connection. The
connection pool is managed by this singleton, and the code is then
reentrant.

We also accept the type `str` (optionally complemented by `async_mode`.
I know you don't like this much, but it's the only way to allow an
asynchronous connection string).

In order to unify the different classes handling database connections,
we have renamed `connection_string` to `connection`, and `Session` to
`session_maker`.

Now, a single transaction is used to add a list of messages. Thus, a
crash during this write operation will not leave the database in an
unstable state with a partially added message list. This makes the code
resilient.

We believe that the `PostgresChatMessageHistory` class is no longer
necessary and can be replaced by:
```
PostgresChatMessageHistory = SQLChatMessageHistory
```
This also fixes the bug.


## Issue
- [issue 22021](#22021)
  - Bug in _exit_history()
  - Bugs in PostgresChatMessageHistory and sync usage
  - Bugs in PostgresChatMessageHistory and async usage
- [issue
36](langchain-ai/langchain-postgres#36)
 ## Twitter handle:
pprados

## Tests
- libs/community/tests/unit_tests/chat_message_histories/test_sql.py
(add async test)

@baskaryan, @eyurtsev or @hwchase17 can you check this PR ?
And, I've been waiting a long time for validation from other PRs. Can
you take a look?
- [PR 32](langchain-ai/langchain-postgres#32)
- [PR 15575](#15575)
- [PR 13200](#13200)

---------

Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
eyurtsev added a commit to langchain-ai/langchain that referenced this pull request Jun 7, 2024
Hello @eyurtsev

- package: langchain-comminity
- **Description**: Add SQL implementation for docstore. A new
implementation, in line with my other PR ([async
PGVector](langchain-ai/langchain-postgres#32),
[SQLChatMessageMemory](#22065))
- Twitter handler: pprados

---------

Signed-off-by: ChengZi <chen.zhang@zilliz.com>
Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
Co-authored-by: Piotr Mardziel <piotrm@gmail.com>
Co-authored-by: ChengZi <chen.zhang@zilliz.com>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
@pprados
Copy link
Contributor Author

pprados commented Jun 10, 2024

@eyurtsev
I made the mistake of doing a rebase while a review was in progress. This seems to block the process. There's a ‘1 change requested’ request that I can't validate. I propose another PR identical to resolve this problem.

eyurtsev pushed a commit to langchain-ai/langchain that referenced this pull request Jun 10, 2024
…elf query retriever (#22678)

The fact that we outsourced pgvector to another project has an
unintended effect. The mapping dictionary found by
`_get_builtin_translator()` cannot recognize the new version of pgvector
because it comes from another package.
`SelfQueryRetriever` no longer knows `PGVector`.

I propose to fix this by creating a global dictionary that can be
populated by various database implementations. Thus, importing
`langchain_postgres` will allow the registration of the `PGvector`
mapping.

But for the moment I'm just adding a lazy import

Furthermore, the implementation of _get_builtin_translator()
reconstructs the BUILTIN_TRANSLATORS variable with each invocation,
which is not very efficient. A global map would be an optimization.

- **Twitter handle:** pprados

@eyurtsev, can you review this PR? And unlock the PR [Add async mode for
pgvector](langchain-ai/langchain-postgres#32)
and PR [community[minor]: Add SQL storage
implementation](#22207)?

Are you in favour of a global dictionary-based implementation of
Translator?
@pprados pprados closed this Jun 10, 2024
eyurtsev pushed a commit that referenced this pull request Jun 10, 2024
I made the mistake of doing a rebase while a review was in progress.
This seems to block the process. There's a ‘1 change requested’ request
that I can't validate. I propose another PR identical to the [previous
one](#32).

This PR adds the **async** approach for pgvector.

Some remarks:
- We use assert to check invocations and not if. Thus, in production, it
is possible to
remove these checks with `python -O ...`
- We propose a public `session_maker` attribute. This is very important
for resilient
scenarios.

In a RAG architecture, it is necessary to import document chunks.

To keep track of the links between chunks and documents, we can use the 

[index()](https://python.langchain.com/docs/modules/data_connection/indexing/)
API.
This API proposes to use an SQL-type record manager.

In a classic use case, using `SQLRecordManager` and a vector database,
it is impossible
to guarantee the consistency of the import.

Indeed, if a crash occurs during the import, there is an inconsistency
between the SQL
database and the vector database.

**PGVector is the solution to this problem.**

Indeed, it is possible to use a single database (and not a two-phase
commit with 2
technologies, if they are both compatible). But, for this, it is
necessary to be able
to combine the transactions between the use of `SQLRecordManager` and
`PGVector` as a
vector database.

This is only possible if it is possible to intervene on the
`session_maker`.

This is why we propose to make this attribute public. By unifying the
`session_maker`
of `SQLRecordManager` and `PGVector`, it is possible to guarantee that
all processes will
be executed in a single transaction.

This is, moreover, the only solution we know of to guarantee the
consistency of the
import of chunks into a vector database. It's possible only if the outer
session is built
with the connection.

```python
def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)
```
The same thing is possible asynchronously, but a bug in
`sql_record_manager.py`
in `_amake_session()` must first be fixed (See
[PR](langchain-ai/langchain#20735) ).

```python
    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session
``` 
Then, it is possible to do the same thing asynchronously:

```python
async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())
```

The promise of the constructor, with the `create_extension` parameter,
is to guarantee that the extension is added before the APIs are used.
Since this promise cannot be kept in an `async` scenario, there is an
alternative:

- Remove this parameter, since the promise cannot be kept. Otherwise, an
`async` method is needed to install the extension before the APIs are
used, and to check that this method has been invoked at the start of
each API.
- Use a lazy approach as suggested, which simply respects the
constructor's promise.
pprados added a commit to pprados/langchain that referenced this pull request Jun 11, 2024
…elf query retriever (langchain-ai#22678)

The fact that we outsourced pgvector to another project has an
unintended effect. The mapping dictionary found by
`_get_builtin_translator()` cannot recognize the new version of pgvector
because it comes from another package.
`SelfQueryRetriever` no longer knows `PGVector`.

I propose to fix this by creating a global dictionary that can be
populated by various database implementations. Thus, importing
`langchain_postgres` will allow the registration of the `PGvector`
mapping.

But for the moment I'm just adding a lazy import

Furthermore, the implementation of _get_builtin_translator()
reconstructs the BUILTIN_TRANSLATORS variable with each invocation,
which is not very efficient. A global map would be an optimization.

- **Twitter handle:** pprados

@eyurtsev, can you review this PR? And unlock the PR [Add async mode for
pgvector](langchain-ai/langchain-postgres#32)
and PR [community[minor]: Add SQL storage
implementation](langchain-ai#22207)?

Are you in favour of a global dictionary-based implementation of
Translator?
JonZeolla pushed a commit to JonZeolla/langchain that referenced this pull request Jun 11, 2024
Hello @eyurtsev

- package: langchain-comminity
- **Description**: Add SQL implementation for docstore. A new
implementation, in line with my other PR ([async
PGVector](langchain-ai/langchain-postgres#32),
[SQLChatMessageMemory](langchain-ai#22065))
- Twitter handler: pprados

---------

Signed-off-by: ChengZi <chen.zhang@zilliz.com>
Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
Co-authored-by: Piotr Mardziel <piotrm@gmail.com>
Co-authored-by: ChengZi <chen.zhang@zilliz.com>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
JonZeolla pushed a commit to JonZeolla/langchain that referenced this pull request Jun 11, 2024
…elf query retriever (langchain-ai#22678)

The fact that we outsourced pgvector to another project has an
unintended effect. The mapping dictionary found by
`_get_builtin_translator()` cannot recognize the new version of pgvector
because it comes from another package.
`SelfQueryRetriever` no longer knows `PGVector`.

I propose to fix this by creating a global dictionary that can be
populated by various database implementations. Thus, importing
`langchain_postgres` will allow the registration of the `PGvector`
mapping.

But for the moment I'm just adding a lazy import

Furthermore, the implementation of _get_builtin_translator()
reconstructs the BUILTIN_TRANSLATORS variable with each invocation,
which is not very efficient. A global map would be an optimization.

- **Twitter handle:** pprados

@eyurtsev, can you review this PR? And unlock the PR [Add async mode for
pgvector](langchain-ai/langchain-postgres#32)
and PR [community[minor]: Add SQL storage
implementation](langchain-ai#22207)?

Are you in favour of a global dictionary-based implementation of
Translator?
hinthornw pushed a commit to langchain-ai/langchain that referenced this pull request Jun 20, 2024
…22065)

# package community: Fix SQLChatMessageHistory

## Description
Here is a rewrite of `SQLChatMessageHistory` to properly implement the
asynchronous approach. The code circumvents [issue
22021](#22021) by
accepting a synchronous call to `def add_messages()` in an asynchronous
scenario. This bypasses the bug.

For the same reasons as in [PR
22](langchain-ai/langchain-postgres#32) of
`langchain-postgres`, we use a lazy strategy for table creation. Indeed,
the promise of the constructor cannot be fulfilled without this. It is
not possible to invoke a synchronous call in a constructor. We
compensate for this by waiting for the next asynchronous method call to
create the table.

The goal of the `PostgresChatMessageHistory` class (in
`langchain-postgres`) is, among other things, to be able to recycle
database connections. The implementation of the class is problematic, as
we have demonstrated in [issue
22021](#22021).

Our new implementation of `SQLChatMessageHistory` achieves this by using
a singleton of type (`Async`)`Engine` for the database connection. The
connection pool is managed by this singleton, and the code is then
reentrant.

We also accept the type `str` (optionally complemented by `async_mode`.
I know you don't like this much, but it's the only way to allow an
asynchronous connection string).

In order to unify the different classes handling database connections,
we have renamed `connection_string` to `connection`, and `Session` to
`session_maker`.

Now, a single transaction is used to add a list of messages. Thus, a
crash during this write operation will not leave the database in an
unstable state with a partially added message list. This makes the code
resilient.

We believe that the `PostgresChatMessageHistory` class is no longer
necessary and can be replaced by:
```
PostgresChatMessageHistory = SQLChatMessageHistory
```
This also fixes the bug.


## Issue
- [issue 22021](#22021)
  - Bug in _exit_history()
  - Bugs in PostgresChatMessageHistory and sync usage
  - Bugs in PostgresChatMessageHistory and async usage
- [issue
36](langchain-ai/langchain-postgres#36)
 ## Twitter handle:
pprados

## Tests
- libs/community/tests/unit_tests/chat_message_histories/test_sql.py
(add async test)

@baskaryan, @eyurtsev or @hwchase17 can you check this PR ?
And, I've been waiting a long time for validation from other PRs. Can
you take a look?
- [PR 32](langchain-ai/langchain-postgres#32)
- [PR 15575](#15575)
- [PR 13200](#13200)

---------

Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
hinthornw pushed a commit to langchain-ai/langchain that referenced this pull request Jun 20, 2024
Hello @eyurtsev

- package: langchain-comminity
- **Description**: Add SQL implementation for docstore. A new
implementation, in line with my other PR ([async
PGVector](langchain-ai/langchain-postgres#32),
[SQLChatMessageMemory](#22065))
- Twitter handler: pprados

---------

Signed-off-by: ChengZi <chen.zhang@zilliz.com>
Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
Co-authored-by: Piotr Mardziel <piotrm@gmail.com>
Co-authored-by: ChengZi <chen.zhang@zilliz.com>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
hinthornw pushed a commit to langchain-ai/langchain that referenced this pull request Jun 20, 2024
…elf query retriever (#22678)

The fact that we outsourced pgvector to another project has an
unintended effect. The mapping dictionary found by
`_get_builtin_translator()` cannot recognize the new version of pgvector
because it comes from another package.
`SelfQueryRetriever` no longer knows `PGVector`.

I propose to fix this by creating a global dictionary that can be
populated by various database implementations. Thus, importing
`langchain_postgres` will allow the registration of the `PGvector`
mapping.

But for the moment I'm just adding a lazy import

Furthermore, the implementation of _get_builtin_translator()
reconstructs the BUILTIN_TRANSLATORS variable with each invocation,
which is not very efficient. A global map would be an optimization.

- **Twitter handle:** pprados

@eyurtsev, can you review this PR? And unlock the PR [Add async mode for
pgvector](langchain-ai/langchain-postgres#32)
and PR [community[minor]: Add SQL storage
implementation](#22207)?

Are you in favour of a global dictionary-based implementation of
Translator?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants