Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: RuntimeWarning: coroutine 'AsyncCursor.execute' was never awaited #58557

Open
3 tasks done
dss010101 opened this issue May 3, 2024 · 3 comments
Open
3 tasks done
Labels
Bug Needs Triage Issue that has not been reviewed by a pandas team member

Comments

@dss010101
Copy link

dss010101 commented May 3, 2024

Pandas version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest version of pandas.

  • I have confirmed this bug exists on the main branch of pandas.

Reproducible Example

I have the code below.  read_sql is method on my DBReader class and it's using pd.read_sql.

import pandas as pd

async def read_sql_async(self, sql, params = None):
  t1 = timeit.default_timer()

  async with await psycopg.AsyncConnection.connect(self.connect_str) as conn:
     #await asyncio.to_thread(pd.read_sql, query, con)
     res = await asyncio.get_event_loop().run_in_executor(None, pd.read_sql, sql, conn, params)

  print(f'perf: {timeit.default_timer() - t1}')
  return res

the asyncio code is below

async def test_thread_asyncio():
   db_reader = DataReader()
   sql = "select id, name from factor f where f.id = ANY(%s)"
   threads = 20
   id_partitions = np.array_split(list(range(1, 10000)), threads)
   id_partitions = [[p.tolist()] for p in id_partitions]
   tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}

   for task in asyncio.as_completed(tasks):
      try:
         df = await asyncio.shield(task)
      except Exception as exc:
         log.exception(f'error retrieving data')
      else:
         if df is not None:
            print(f'shape: {df.shape}')   

When i run this, im getting the following error:

/usr/local/lib/python3.12/site-packages/pandas/io/sql.py:2674: RuntimeWarning: coroutine 'AsyncCursor.execute' was never awaited
...
...
  File "/usr/local/lib/python3.12/asyncio/tasks.py", line 631, in _wait_for_one
    return f.result()  # May raise f.exception().
           ^^^^^^^^^^
  File "/workspaces/v8_pi_lab/src/v8_pi_lab/data/providers/db_provider.py", line 116, in read_sql_async
    res = await asyncio.get_event_loop().run_in_executor(None, pd.read_sql, sql, conn, params)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/pandas/io/sql.py", line 706, in read_sql
    return pandas_sql.read_query(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/pandas/io/sql.py", line 2739, in read_query
    columns = [col_desc[0] for col_desc in cursor.description]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object is not iterable


### Issue Description

I need to make parallelize my calls to the DB.  Using multiprocess works..but given these are mostly IO bound operations and not CPU bound, i prefer to use threads.  When i wrote code using ```concurrent.futures.ThreadPoolExecutor```, and that also works, but as you can see below by the timing, there seems to be some blocking.  so i then attempted to use coroutines..but i am getting the above error.


### Expected Behavior

To be able to make parallelized threading calls to pandas.read_sql and read_sql_query without blocking 

### Installed Versions

<details>
Cython                : None
pytest                : 8.2.0
hypothesis            : None
sphinx                : None
blosc                 : None
feather               : None
xlsxwriter            : None
lxml.etree            : None
html5lib              : None
pymysql               : None
psycopg2              : None
jinja2                : 3.1.3
IPython               : None
pandas_datareader     : None
adbc-driver-postgresql: None
adbc-driver-sqlite    : None
bs4                   : None
bottleneck            : None
dataframe-api-compat  : None
fastparquet           : None
fsspec                : None
gcsfs                 : None
matplotlib            : None
numba                 : None
numexpr               : None
odfpy                 : None
openpyxl              : 3.1.2
pandas_gbq            : None
pyarrow               : 16.0.0
pyreadstat            : None
python-calamine       : None
pyxlsb                : None
s3fs                  : None
scipy                 : None
sqlalchemy            : 2.0.29
tables                : None
tabulate              : None
xarray                : None
xlrd                  : None
zstandard             : None
tzdata                : 2024.1
qtpy                  : None
pyqt5                 : None

</details>
@dss010101 dss010101 added Bug Needs Triage Issue that has not been reviewed by a pandas team member labels May 3, 2024
@dss010101
Copy link
Author

this is the version that works with ThreadPoolExecutor, but seems to block:

Here is the version that uses Thread Pooling

def read_sql(self, sql, params = None):
  t1 = timeit.default_timer()
  with warnings.catch_warnings():
	 warnings.simplefilter("ignore", UserWarning)
	 with psycopg.connect(self.connect_str, autocommit=True) as conn:
		df = pd.read_sql(sql, con = conn, params = params)
	 
	 self.log.debug(f'perf: {timeit.default_timer() - t1}')
	 return df

the concurrent futures code is this:

import concurrent.futures as cf

def test_thread_pool():
   db_reader = DataReader()
   sql = "select id, name from factor f where f.id = ANY(%s)"
   threads = 20
   id_partitions = np.array_split(list(range(1, 10000)), threads)
   id_partitions = [[p.tolist()] for p in id_partitions]
   with cf.ThreadPoolExecutor(max_workers=threads) as exec:
      futures = {
         exec.submit(db_reader.read_sql, sql, params=p):
           p for p in id_partitions
      }
      
      for future in cf.as_completed(futures):
         ids = futures[future]
         try:
            df = future.result()
         except Exception as exc:
            log.exception(f'error retrieving data for: {ids}')
         else:
            if df is not None:
               print(f'shape: {df.shape}')   

The output of the debug line from read_sql looks like this:

perf: 0.7313497869981802
perf: 0.8116309550023288
perf: 3.401154975006648
perf: 5.22201336100261
perf: 6.325166654998611
perf: 6.338692951001576
perf: 6.573095380997984
perf: 6.5976604809984565
perf: 6.8282670119951945
perf: 7.291718505999597
perf: 7.4276196580030955
perf: 7.407097272000101
perf: 8.38801568299823
perf: 9.119963648998237

You'll notice that it is incrementing - id have expected it to be all roughly around the same time - so it seems there is some sql blocking. Also, the time gap between the first two threads and 3rd is always about 2-3 seconds - why is that?
I've also tried creating a new DbReader instance for each thread..but same effect.

anyone know if the connection or pandas read_sql blocks? or how to solve?

@Aloqeely
Copy link
Member

Aloqeely commented May 5, 2024

I have not had enough time to look at this, but from the issue title, shouldn't you await this to remove the warning:

tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}

Because db_reader.read_sql_async is an async method.

@dss010101
Copy link
Author

dss010101 commented May 5, 2024

I have not had enough time to look at this, but from the issue title, shouldn't you await this to remove the warning:

tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}

Because db_reader.read_sql_async is an async method.

I am. You can see the await in the for/as_completed loop.

I think the problem is prob an async connection or cusor doesn't work with pandas. If so, if like to understand if pandas blocks ..as per the 2nd example, why am I not able to achieve better concurrency

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Bug Needs Triage Issue that has not been reviewed by a pandas team member
Projects
None yet
Development

No branches or pull requests

2 participants