Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster way to map data to pd.DataFrame #316

Open
ChristianF88 opened this issue Jun 7, 2022 · 9 comments
Open

Faster way to map data to pd.DataFrame #316

ChristianF88 opened this issue Jun 7, 2022 · 9 comments

Comments

@ChristianF88
Copy link

Hi Daniele,

I approached you at PyCon Italia, after your presentation.

I benchmarked the fastest ways I know of to retrieve data out of a postgresql database and return a pd.DataFrame. At the moment this is what I got (never mind the exact execution times):

The code:

import psycopg
import psycopg2
import pandas as pd
from tempfile import TemporaryFile
from sqlalchemy import create_engine
## database credentials
CREDS = {
    "host": "<HOSTNAME>",
    "user": "<USERNAME>",
    "port": 5432,
    "dbname": "<DBNAME>",
    "password": "<PASSWORD>"
}
CONNECTION_STRING = " ".join((f"{k}={v}" for k,v in CREDS.items()))
ENGINE = create_engine(
    "postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}".format(**CREDS)
)

# Getting data from very large table (half a billion rows)
QUERY = "Select * from signal LIMIT 1000000;"
def pandas_native_query(query):
    return pd.read_sql(query, ENGINE)


def psycopg2_native_query(query):
    with psycopg2.connect(CONNECTION_STRING) as connection:
        with connection.cursor() as cur:
            cur.execute(query)
            return pd.DataFrame(cur.fetchall())

        
def psycopg3_native_query(query):
    with psycopg.connect(CONNECTION_STRING) as connection:
        with connection.cursor() as cur:
            cur.execute(query)
            return pd.DataFrame(cur.fetchall())
        

def psycopg2_copy_dataframe(query):
    with psycopg2.connect(CONNECTION_STRING) as connection:
        with connection.cursor() as cur:
            with TemporaryFile() as tmpfile:
                copy_sql = f"COPY ({query.strip(';')}) TO STDOUT WITH CSV HEADER"
                cur.copy_expert(copy_sql, tmpfile)
                tmpfile.seek(0)
                return pd.read_csv(tmpfile)

                    
def psycopg3_copy_dataframe(query):
    df = []
    with psycopg.connect(CONNECTION_STRING) as connection:
        with connection.cursor() as cur:
            with cur.copy(f"COPY ({query.strip(';')}) TO STDOUT") as copy:
                for row in copy.rows():
                    df.append(row)
    return pd.DataFrame(df)

The resutls:

%timeit pandas_native_query(QUERY)

13 s ± 3.61 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg2_native_query(QUERY)

26.8 s ± 7.07 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_native_query(QUERY)

6.59 s ± 589 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg2_copy_dataframe(QUERY)

3.89 s ± 340 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_copy_dataframe(QUERY)

5.41 s ± 45.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


At the moment the fastest method is psycopg2 using copy_expert

I used:

5.15.41-1-MANJARO Linux
Python 3.9.5

pandas==1.4.2
SQLAlchemy==1.4.37
psycopg-binary==3.0.14
psycopg2-binary==2.9.3

It would be great, if there was a faster way to retrieve data and map it to a pd.DataFrame. Do you have any ideas one how that could be achieved?

Let me know, if I can help!

Thank you so much! :)

Have a great one!

@dvarrazzo
Copy link
Member

Hello, thank you very much :)

Let's make a couple of other tests:

  1. Could you please add a test reading from psycopg 3 copy block-by-block, using for block in copy and the same CSV technique you use for psycopg2?
  2. Could you please try to use a spooled temp file to keep the data in memory?

I expect in test 1 that psycopg 3 will be as fast as 2. If data fit in memory, both psycopg 2 and 3 would be faster.

However, all this sucks, and I'd love to do better :)

  • Copying row by row has the advantage of specifying data types; however it is slower because we have to create the Python intermediate objects.
  • Pandas read_csv() is a parser. It uses heuristics to infer the data types (which in the database we have explicit). It is a process of:
    • getting something strictly typed in columns
    • convert everything to text
    • mash the text together into a single file
    • ask pandas to read the file
    • pandas has to split the rows, then the fields, has to infer data types, has to parse the data

I am pretty sure that it can be improved 😄 Especially by avoiding the CSV and by creating an in-memory representation without leaving C.

One way would be to convert the data coming from Postgres (either from COPY or from querying) into arrow/parquet (I still don't know in which relation are with each other) and feed it to Pandas.read_parquet()

Another would be to create a Pandas object directly from the data coming from Postgres, if the object structure is documented accessible.

Have you got an opinion about these scenarios?

@ChristianF88
Copy link
Author

As requested here are the other tests ;) I also added a third one to simply write to a BytesIO object.

I doubled the query size for the examples below, so differences are better visible. (Getting 2_000_000 lines)

Additional Code:

from tempfile import SpooledTemporaryFile
from io import BytesIO
def psycopg3_block_copy_dataframe(query):
    with psycopg.connect(CONNECTION_STRING) as connection:
        with connection.cursor() as cur:
            with TemporaryFile() as tmpfile:
                copy_sql = f"COPY ({query.strip(';')}) TO STDOUT WITH CSV HEADER"
                with cur.copy(copy_sql,) as copy:
                    for data in copy:
                        tmpfile.write(data)
                tmpfile.seek(0)
                return pd.read_csv(tmpfile)
            
def psycopg3_block_spooled_copy_dataframe(query):
    with psycopg.connect(CONNECTION_STRING) as connection:
        with connection.cursor() as cur:
            with SpooledTemporaryFile() as tmpfile:
                copy_sql = f"COPY ({query.strip(';')}) TO STDOUT WITH CSV HEADER"
                with cur.copy(copy_sql,) as copy:
                    for data in copy:
                        tmpfile.write(data)
                tmpfile.seek(0)
                return pd.read_csv(tmpfile)
            
def psycopg3_block_bio_copy_dataframe(query):
    with psycopg.connect(CONNECTION_STRING) as connection:
        with connection.cursor() as cur:
            with BytesIO() as bio:
                copy_sql = f"COPY ({query.strip(';')}) TO STDOUT WITH CSV HEADER"
                with cur.copy(copy_sql,) as copy:
                    for data in copy:
                        bio.write(data)
                bio.seek(0)
                return pd.read_csv(bio)

All results:

%timeit pandas_native_query(QUERY)

11.4 s ± 273 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg2_native_query(QUERY)

10.3 s ± 164 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_native_query(QUERY)

9.51 s ± 210 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg2_copy_dataframe(QUERY)

5.51 s ± 61.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_copy_dataframe(QUERY)

10.2 s ± 307 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_block_copy_dataframe(QUERY)

5.45 s ± 192 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_block_spooled_copy_dataframe(QUERY)

5.82 s ± 127 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_block_bio_copy_dataframe(QUERY)

5.26 s ± 106 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Intriguingly the TempFile is faster than the SpooledTempFile for my benchmarks. I have been testing via VPN though^^
BytesIO wins the race!


Regarding both options, I think whichever option is more robust and less likely to change. I am not sure which interface is more likely to change (the parquet object or the core construct of a pd.DataFrame). The parquet object might be better suited.

Should I try to get into contact with pandas?

Cheers!

@dvarrazzo
Copy link
Member

Ok, so it seems like the block-by-block interface is the fastest option, which is understandable as no Python object per-datum is created, and that psycopg 2 and 3 block-level performances are on the same level. BytesIO makes no use of the disk, so, as long as there is enough RAM, it is the fastest option. So far I'm pleased to see there's no surprise :)

Parquet is an exchange format, so yes, if we plan to extend psycopg to produce data for Pandas, probably that format would give more flexibility. I understand that Pandas can make use of Parquet data without making extra copy [citation needed - I'm not an expert], so possibly there should be no performance impact to prefer this route.

In terms of psycopg it seems a big work, but doable, and I would be interested in doing it. We have adapters to convert from the Postgres formats (text and binary) to Python objects. We need to have homologous adapters to convert from Postgres format to Parquet and produce a data structure, instead of a list, on fetchmany()/fetchall().

@ChristianF88
Copy link
Author

Glad to hear, everything runs as expected :) Let me know, if I can help in any way!

@david-lorenzo
Copy link

Hello,

Sorry to step into the conversation but today I realized psycopg3 was finally published and I was taking a look on the bug tracker to get a sense about the stability of this package and the severity of the bugs.

The title of this thread called my attention and I find this subject very interesting. I have to thank ChristianF88 for his examples because I've learnt some tricks to get some performance boost.

Having a fast way to "export" the results of a query to a pandas DataFrame would be awesome. I did some research a while ago and I think these links/projects may help you to move forward to get this goal in the case that you want to add such funtionality.

Pandas DataFrame use numpy arrays to store the data of the columns in memory. Apache Arrow is also an in memory columnar store but have some advantages because two processes implemented in different programming languages can share the same memory region to work on the same copy of the data or a python application can share the data with a native lib with no conversions or copies. Apache paquet is an on disk columnar store usually in compressed format. You can think of Apache Arrow as the in memory uncompressed version of a parquet file (sort of).

The pandas api has a read_parquet() function similar to the read_csv() or read_excel(). It let's you create a pandas Dataframe from a parquet file. It performs the conversions of an arrow column to a numpy array column which is the basic object that holds the data in the pandas DataFrame (a DF is a set of Series objects, each Series objects holds a numpy array).

There is an alternative dataframe package called polars which is implemented in rust with python bindings that uses Arrow as the in memory datastore. It promises better performance than pandas because it can use several cores to perform the operations on the dataframes, uses a copy on write and lazy evaluation (also supports eager mode) strategy to avoid creating new copies of the data and supports out of memory execution (you can mmap an arrow file from disk bigger than your total RAM and manage to process it, something you can't do with pandas).

On the database side, there is connector-x also implemented in rust (See also this article on towardatascience). According to their statements it is the fastest library for creation of dataframes (several formats) from a database. As far as I now they achieve this avoiding the creation of unnecessary copies of the data once they have it on the data buffer of the database driver connection. I think it would be handy or inspiring for you to see how they convert the responses to a pandas dataframe or to an arrow table.

To end this post, there is also turbodbc which also implements the creation of pandas dataframes creating first an Arrow table and using pyarrow functionality to transform that object to a pandas Dataframe.

It would be interesting to see how connectorx and turbodbc perfroms compared to the different methods that Chirstian used in his tests.

@dvarrazzo
Copy link
Member

Thank you for the write-up, @david-lorenzo, that's a very helpful summary 🙂

@ChristianF88
Copy link
Author

Great info @david-lorenzo , thanks! I extended the benchmarks. However I am not sure, if I really used the libraries to their greatest advantage. Also I couldn't get turbodbc running.

Additional Code:

# connectorx==0.3.0
# polars==0.13.55
# pyarrow==8.0.0
import connectorx as cx
import polars as pl
def connectorx_query(query):
    return cx.read_sql(ENGINE_CONN_X, query.strip(';'), return_type="pandas")

def connectorx_query_paritioned(query):
    return cx.read_sql(
        ENGINE_CONN_X, 
        query.strip(';'), 
        return_type="pandas", 
        partition_on="signal_id", 
        partition_num=3
    )

def polars_connectorx_native(query):
    """doesn't return a pandas dataframe"""
    return pl.read_sql(query.strip(';'), ENGINE_CONN_X)

def polars_connectorx_to_pandas(query):
    return pl.read_sql(query.strip(';'), ENGINE_CONN_X).to_pandas()

All Results

This time no VPN that's why everything is a bit faster.

%timeit pandas_native_query(QUERY)

8.79 s ± 284 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg2_native_query(QUERY)

8.2 s ± 388 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_native_query(QUERY)

7.08 s ± 187 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg2_copy_dataframe(QUERY)

5.01 s ± 153 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_copy_dataframe(QUERY)

9.43 s ± 177 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_block_copy_dataframe(QUERY)

5.02 s ± 111 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_block_spooled_copy_dataframe(QUERY)

5.52 s ± 81.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit psycopg3_block_bio_copy_dataframe(QUERY)

5.01 s ± 131 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit connectorx_query(QUERY)

9.04 s ± 404 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit connectorx_query_paritioned(QUERY)

7.18 s ± 344 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit polars_connectorx_native(QUERY)

7.66 s ± 147 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit polars_connectorx_to_pandas(QUERY)

7.71 s ± 160 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

This webpage also give a great overview on the different benchmarks. Especially the partitioning functionality of connector-x makes a huge difference. Not so for my setup.

@amachanic
Copy link

Not sure if it's related, but I'm seeing significantly reduced performance when using psycopg3 as compared to psycopg2, in the absence of Pandas. On this end most queries in the app I tried to migrate return on average 2x-3x slower than before.

Below is a simple repro that runs ~5x more slowly if I change from psycopg2 to psycopg. I tried with both psycopg binary and c, version 3.0.16. psycopg2 binary, version 2.8.6.

def f():
    con = psycopg2.connect([CONNECTION_STRING])
    cur = con.cursor()
    cur.execute("select array[0,1,2,3,4,5,6,7,8,9] from generate_series(0, 100000) as g(r)")
    return(list(iter(cur)))

@dvarrazzo
Copy link
Member

dvarrazzo commented Aug 26, 2022

Thank you for the report, @amachanic. You are right. More details in #359.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants