Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polars read_database with an existing async session object #16616

Closed
2 tasks done
WilliamStam opened this issue May 31, 2024 · 5 comments · Fixed by #16680
Closed
2 tasks done

Polars read_database with an existing async session object #16616

WilliamStam opened this issue May 31, 2024 · 5 comments · Fixed by #16680
Assignees
Labels
bug Something isn't working python Related to Python Polars

Comments

@WilliamStam
Copy link

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

from datetime import datetime

from sqlalchemy import (
    DateTime,
    String,
)
from sqlalchemy.orm import (
    Mapped,
    mapped_column,
)
from sqlalchemy.sql import func

from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import (
    URL,
)
from sqlalchemy.ext.asyncio import (
    async_sessionmaker,
    create_async_engine,
)
import asyncio
import logging

from sqlalchemy import (
    select,
    text,
)
import polars

logger = logging.getLogger(__file__)


class TablesBase(AsyncAttrs, DeclarativeBase):
    pass


class CountriesTable(TablesBase):
    __tablename__ = "COUNTRIES"

    CODE: Mapped[str] = mapped_column(String(2), primary_key=True)
    COUNTRY: Mapped[str] = mapped_column(String(250), nullable=True)




async def main():
    schema = "origin_data"
    db_config = URL.create(
        drivername="oracle+oracledb",
        username="ORIGIN_DATA",
        password="Test_123",
        host="10.100.2.106",
        port=1521,
        database="MUNWH",
    )
    engine = create_async_engine(url=db_config.render_as_string(hide_password=False))
    session_maker = async_sessionmaker(bind=engine)
    session = session_maker()

    # Need to set the schema for every transaction. calls happen from a "service" account so cant rely on using the user's schema thing
    await session.execute(text(f"ALTER SESSION SET CURRENT_SCHEMA = {schema}"))

    stmt = select(CountriesTable).order_by(CountriesTable.CODE)
    print(stmt)
    print(db_config)

    schema = {
        "code": polars.String,
        "country": polars.String,
    }

    try:

        df = polars.read_database(
            query=stmt,
            connection=session,
            schema_overrides=schema,
        )
        print(df)
    except Exception as e:
        logger.exception(e)
    finally:
        await session.close()
        await engine.dispose()


if __name__ == "__main__":
    asyncio.run(main())

Log output

'AsyncSession' object has no attribute 'engine'
Traceback (most recent call last):
  File "W:\Projects\Hydra\src\test.py", line 82, in main
    df = polars.read_database(
         ^^^^^^^^^^^^^^^^^^^^^
  File "W:\Projects\Hydra\.venv\Lib\site-packages\polars\io\database\functions.py", line 274, in read_database
    with ConnectionExecutor(connection) as cx:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "W:\Projects\Hydra\.venv\Lib\site-packages\polars\io\database\_executor.py", line 94, in __init__
    self.cursor = self._normalise_cursor(connection)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "W:\Projects\Hydra\.venv\Lib\site-packages\polars\io\database\_executor.py", line 323, in _normalise_cursor
    if conn.engine.driver == "databricks-sql-python":
       ^^^^^^^^^^^
AttributeError: 'AsyncSession' object has no attribute 'engine'

Issue description

Very glad that read_database is now able to accept async connections :D

im not sure if this is a bug report or a feature request or a wont fix or something. with the above ive tried passing:

  • db_config - AttributeError: 'URL' object has no attribute 'engine'
  • engine - RuntimeWarning: coroutine 'ConnectionExecutor._sqlalchemy_async_execute' was never awaited
  • session - AttributeError: 'AsyncSession' object has no attribute 'engine'
  • db_config.render_as_string(hide_password=False) - TypeError: read_database_uri expects one or more string queries; found <class 'sqlalchemy.sql.selectable.Select'> (duh since docs say if using a SQLAlchemy connection object this can be a suitable “Selectable”, otherwise it is expected to be a string)

(i also tried session.connection() but then await and async issues)

im relatively sure im just doing something stupid at this point like im supposed to sue a diff read_database or something but its not working :(

thanks for your time whoever is reading this!

Expected behavior

return a dataframe with 2 columns and rows of countries and their codes

Installed versions

--------Version info---------
Polars:               0.20.30
Index type:           UInt32
Platform:             Windows-11-10.0.22631-SP0
Python:               3.12.2 (tags/v3.12.2:6abddd9, Feb  6 2024, 21:26:36) [MSC v.1937 64 bit (AMD64)]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               <not installed>
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           <not installed>
nest_asyncio:         <not installed>
numpy:                <not installed>
openpyxl:             <not installed>
pandas:               <not installed>
pyarrow:              <not installed>
pydantic:             2.7.2
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           2.0.30
torch:                <not installed>
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>
@WilliamStam WilliamStam added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels May 31, 2024
@WilliamStam
Copy link
Author

theres this #15162 PR but even if i use the engine it still doesnt work

@WilliamStam
Copy link
Author

WilliamStam commented May 31, 2024

o right.. need to install nest_asyncio first

pdm add nest_asyncio then engine works but you still cant pass a session to it?

@alexander-beedie alexander-beedie self-assigned this May 31, 2024
@alexander-beedie
Copy link
Collaborator

alexander-beedie commented May 31, 2024

I'll try and repro this at home and take a look, as I believe this should indeed work (pretty sure I tested with AsyncSession, but seems I missed something, hmm) - thanks for the sample code ;)

@alexander-beedie alexander-beedie removed the needs triage Awaiting prioritization by a maintainer label May 31, 2024
@WilliamStam
Copy link
Author

if this works for you im gonna go slit my wrists with a piece of paper. cheers for all the work you put into it already

@alexander-beedie
Copy link
Collaborator

alexander-beedie commented Jun 3, 2024

if this works for you im gonna go slit my wrists with a piece of paper. cheers for all the work you put into it already

Got a fix incoming: "don't go towards the light..." ;)

Also, it turns out that there are actually two issues here - one is a bug (for which I'm about to land a fix), but the other is an enhancement. We currently don't support automatically unpacking SQLAlchemy ORM models (eg: the CountriesTable base object) into their component columns. I think we should do this, but currently we don't - however, that's separate from the bug so I'll address it in a follow-up.

Once the fix lands you can work around that by passing the compiled query to read_database instead of the Selectable. Something like this should work: pl.read_database(str(stmt.compile()), conn).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working python Related to Python Polars
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants