New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dialect cannot work with huge data tables (allocation of memory on server/client) #116

Closed
Rustem opened this Issue May 4, 2017 · 23 comments

Comments

Projects
None yet
3 participants
@Rustem

Rustem commented May 4, 2017

  1. For Amazon Redshift SQLAlchemy library by default using psycopg2 library that is used for Postgres. It by default using client side cursors. When sql query is executing it allocated all QuerySet in memory, because of this big query is overflow memory limit.
    After this I tried to use Raw Driver Connection using Psycopg2 and work with server side cursor that is recommended by psycopg documentation (http://initd.org/psycopg/docs/usage.html#server-side-cursors):
    http://docs.aws.amazon.com/redshift/latest/dg/declare.html
begin;
DECLARE curname cursor FOR
    SELECT * FROM schema.table;
fetch forward 10 from curname;
fetch next from curname;
close curname;
commit;

or

command_text = """
    SELECT * FROM schema.table;
"""
conn = psycopg2.connect(host="HOST", dbname="dbname", password="****", user="user", port="5439")
cursor2 = conn.cursor('curname')
cursor2.itersize = 100
cursor2.execute(command_text)
for item in cursor2:
    print(item)

This method is executed but it still allocated all queryset on server side and executing for about 10 minutes. In same cases it still have problem with overflow memory, but at this time on server side.

  1. When I tried to execute it using SQL Workbench query is starting executing and return result in loop by 1000 result and printed it.
    I assuming that workbench is making another kind of limit offset on their side.
    Also I tested it using java code and use redhift jdbc driver using Java. It code work as expected and return result row by row.
    So I assuming problem in python driver psycopg2 that allocating queryset in memory before returning result
package connection;

import java.sql.*;
import java.util.Properties;
public class Redshift {
    //Redshift driver: "jdbc:redshift://host:5439/dev";
    static final String dbURL = "jdbc:redshift://host:5439/dbname";
    static final String MasterUsername = "user";
    static final String MasterUserPassword = "****";
    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        try{
           Class.forName("com.amazon.redshift.jdbc41.Driver");
           //Open a connection and define properties.
           System.out.println("Connecting to database...");
           Properties props = new Properties();
           //Uncomment the following line if using a keystore.
           //props.setProperty("ssl", "true");
           props.setProperty("user", MasterUsername);
           props.setProperty("password", MasterUserPassword);
           conn = DriverManager.getConnection(dbURL, props);
           //Try a simple query.
           System.out.println("Listing system tables...");
           stmt = conn.createStatement();
           String sql;
           sql = "SELECT * FROM schema.table;";
           ResultSet rs = stmt.executeQuery(sql);
           //Get the data from the result set.
           while(rs.next()){
              //Retrieve two columns.
              String catalog = rs.getString("list_entry_id");
              String name = rs.getString("source_code");
              //Display values.
              System.out.print("Catalog: " + catalog);
              System.out.println(", Name: " + name);
           }
           rs.close();
           stmt.close();
           conn.close();
        }catch(Exception ex){
           //For convenience, handle all errors here.
           ex.printStackTrace();
        }finally{
           //Finally block to close resources.
           try{
              if(stmt!=null)
                 stmt.close();
           }catch(Exception ex){
           }// nothing we can do
           try{
              if(conn!=null)
                 conn.close();
           }catch(Exception ex){
              ex.printStackTrace();
           }
        }
        System.out.println("Finished connectivity test.");
     }
}

@Rustem Rustem changed the title from Dialect cannot work with huge data tables (allocation on server/client) to Dialect cannot work with huge data tables (allocation of memory on server/client) May 4, 2017

@jklukas

This comment has been minimized.

Collaborator

jklukas commented May 4, 2017

From this description, it sounds like the issue is at the psycopg2 layer rather than with sqlalchemy-redshift. If you can configure psycopg2 to have reasonable behavior for your case, we can likely figure out how to expose that through sqlalchemy, but if it's a psycopg2 limitation, that would need to be addressed upstream.

@graingert

This comment has been minimized.

Collaborator

graingert commented May 4, 2017

sounds like issue #100 to me

@Rustem

This comment has been minimized.

Rustem commented May 5, 2017

@graingert really seems abnormal behaviour for the cursors. I know that psycopg driver is not native for redshift, but still cursors are very important. I can't go to production with python app due to this bug.

@graingert

This comment has been minimized.

Collaborator

graingert commented May 5, 2017

@Rustem

This comment has been minimized.

Rustem commented May 7, 2017

I will answer asap thanks

@Rustem

This comment has been minimized.

Rustem commented May 8, 2017

@graingert for redshift connection I tried both:

  1. raw SQL commands
begin;
DECLARE curname cursor FOR
    SELECT * FROM schema.table;
fetch forward 10 from curname;
fetch next from curname;
close curname;
commit;
  1. server side cursors that psycopg2 advices
command_text = """
    SELECT * FROM schema.table;
"""
conn = psycopg2.connect(host="HOST", dbname="dbname", password="****", user="user", port="5439")
cursor = conn.cursor('curname')
cursor.itersize = 100
cursor.execute(command_text)
for item in cursor:
    print(item)

They both allocate memory server-side before sending actual response. So if my table contains hundred million rows, then it allocates memory before sending actual result even I set an itersize up to 1000. That's really abnormal for me.

When I used native JDBC connection, then evth was good.

@graingert

This comment has been minimized.

Collaborator

graingert commented May 8, 2017

@Rustem sorry it's not clear what you mean, when you use part 2 (using curname) does it run without taking up all your memory?

How about when using .execution_options(stream_results=True) ?

@Profiler.profile
def test_core_fetchmany_w_streaming(n):
    """Load Core result rows using fetchmany/streaming."""

    with engine.connect() as conn:
        result = conn.execution_options(stream_results=True).\
            execute(Customer.__table__.select().limit(n))
        while True:
            chunk = result.fetchmany(10000)
            if not chunk:
                break
            for row in chunk:
                data = row['id'], row['name'], row['description']
@Rustem

This comment has been minimized.

Rustem commented May 8, 2017

I think i tried this one, I will try it again and share my results here.

@graingert

This comment has been minimized.

Collaborator

graingert commented May 8, 2017

@Rustem when you use part 2 (psycopg2, using curname) does it run without taking up all your memory?

@Rustem

This comment has been minimized.

Rustem commented May 9, 2017

It works for postgres @graingert , I am trying to do it with redshift now

@Rustem

This comment has been minimized.

Rustem commented May 9, 2017

I am looking through the code of dialect and see that exec option called stream_results just reuses psycopg2 cursor. However the way it should work is to use the native cursor SQL commands. Seems psycopg2 uses https://www.postgresql.org/docs/9.2/static/plpgsql-cursors.html, but redshift cursors has different syntax to operate over it. As soon as I will profile on the redshift i update my comment. There are two options now, either to extend the dialect or keep as it is if it works.

@Rustem

This comment has been minimized.

Rustem commented May 9, 2017

btw, why don't u try something like redshift-odbc ? http://docs.aws.amazon.com/redshift/latest/mgmt/install-odbc-driver-windows.html

@Rustem

This comment has been minimized.

Rustem commented May 9, 2017

So it does not work for now with 600 million records it is too long.
So this picture on AWS is for minutes and nothing get updated.
profiling_redshift_1

After all it raised an exception server side:

Traceback (most recent call last):
 File “redshift_amazon.py”, line 16, in <module>
   result = conn.execution_options(stream_results=True).execute(command_text)
 File “/Users/rustem/Sites/toptal/SparkETL/env/lib/python3.5/site-packages/sqlalchemy/engine/base.py”, line 906, in execute
   return self._execute_text(object, multiparams, params)
 File “/Users/rustem/Sites/toptal/SparkETL/env/lib/python3.5/site-packages/sqlalchemy/engine/base.py”, line 1054, in _execute_text
   statement, parameters
 File “/Users/rustem/Sites/toptal/SparkETL/env/lib/python3.5/site-packages/sqlalchemy/engine/base.py”, line 1159, in _execute_context
   result = context._setup_crud_result_proxy()
 File “/Users/rustem/Sites/toptal/SparkETL/env/lib/python3.5/site-packages/sqlalchemy/engine/default.py”, line 822, in _setup_crud_result_proxy
   result = self.get_result_proxy()
 File “/Users/rustem/Sites/toptal/SparkETL/env/lib/python3.5/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py”, line 472, in get_result_proxy
   return _result.BufferedRowResultProxy(self)
 File “/Users/rustem/Sites/toptal/SparkETL/env/lib/python3.5/site-packages/sqlalchemy/engine/result.py”, line 497, in __init__
   self._init_metadata()
 File “/Users/rustem/Sites/toptal/SparkETL/env/lib/python3.5/site-packages/sqlalchemy/engine/result.py”, line 1106, in _init_metadata
   self.__buffer_rows()
 File “/Users/rustem/Sites/toptal/SparkETL/env/lib/python3.5/site-packages/sqlalchemy/engine/result.py”, line 1128, in __buffer_rows
   self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
psycopg2.InternalError: exceeded the maximum size allowed for the total set of cursor data: 16000MB.

I used the following code to test:

import sqlalchemy as sa
command_text = “”"
   SELECT * from lineorder
“”"
url = ‘redshift+psycopg2://host:5439/dbname’
engine = sa.create_engine(
   url,
   poolclass=sa.pool.QueuePool,
   max_overflow=10,
   pool_size=int(5),
   pool_recycle=10000,
   pool_timeout=300)

with engine.connect() as conn:
   result = conn.execution_options(stream_results=True).execute(command_text)
   print ("connected")
   while True:
       chunk = result.fetchmany(50)
       if not chunk:
           break
       for row in chunk:
           # data = row[‘p_partkey’], row[‘p_name’]
           print(row)
@graingert

This comment has been minimized.

Collaborator

graingert commented May 9, 2017

@Rustem when you use part 2 (psycopg2, using curname) does it run without taking up all your memory?

@Rustem

This comment has been minimized.

Rustem commented May 9, 2017

@graingert no same situation. It allocates memory on server

@graingert

This comment has been minimized.

Collaborator

graingert commented May 9, 2017

memory on server

on the server or the client?

@Rustem

This comment has been minimized.

Rustem commented May 9, 2017

@graingert on the server

@Rustem

This comment has been minimized.

Rustem commented May 9, 2017

It will raise clientside if u use basic cursor.

@Rustem

This comment has been minimized.

Rustem commented May 9, 2017

However when I test with JDBC connection (pure Java code), then everything works fine and I observe that it does not use cursors, it just uses pure stream. According to the server query log, I see only SELECT * FROM lineorder, but console feed data immediately.

@graingert

This comment has been minimized.

Collaborator

graingert commented May 9, 2017

@Rustem I'm not interested in Java code. Can you demonstrate Python code that would not trigger the issue you're having. If not, it's not an issue in sqlalchemy-redshift.

@Rustem

This comment has been minimized.

Rustem commented May 9, 2017

@graingert then may be you show to me a working code that could execute cursor on the huge tables. I think i will raise the issue on psycopg2 https://github.com/psycopg/psycopg2/issues/new. Then what is the real intention of building this lib?

@graingert

This comment has been minimized.

Collaborator

graingert commented May 9, 2017

then may be you show to me a working code that could execute cursor on the huge tables.

This project has very little to do with raw psycopg2 interaction. It's just used to generate the SQL from generic(-ish) sqlalchemy expressions

@graingert graingert closed this May 9, 2017

@graingert

This comment has been minimized.

Collaborator

graingert commented May 9, 2017

closing in favour of psycopg/psycopg2#553

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment