Skip to content

Commit

Permalink
modified sql.py to speed up dumping and loading data with PostgreSQL
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeqfu committed Apr 22, 2020
1 parent 6fe6c27 commit eac4bc4
Showing 1 changed file with 172 additions and 50 deletions.
222 changes: 172 additions & 50 deletions pyhelpers/sql.py
@@ -1,7 +1,10 @@
""" PostgreSQL """

import csv
import gc
import getpass
import io
import tempfile

import pandas as pd
import pandas.io.parsers
Expand Down Expand Up @@ -63,13 +66,24 @@ def __init__(self, host=None, port=None, username=None, password=None, database_
try:
# Create a SQLAlchemy connectable
self.engine = sqlalchemy.create_engine(self.url, isolation_level='AUTOCOMMIT')
self.connection = self.engine.connect()
self.connection = self.engine.raw_connection()
print("Successfully.") if verbose else ""
except Exception as e:
print("Failed. CAUSE: \"{}\".".format(e))

# Check if a database exists
def database_exists(self, database_name):
"""
:param database_name: [str] name of a database
:return: [bool]
"""
result = self.engine.execute("SELECT EXISTS("
"SELECT datname FROM pg_catalog.pg_database "
"WHERE datname='{}');".format(database_name))
return result.fetchone()[0]

# Establish a connection to the specified database_name
def connect_db(self, database_name=None):
def connect_database(self, database_name=None):
"""
:param database_name: [str; None (default)] name of a database; if None, the database name is input manually
"""
Expand All @@ -79,36 +93,25 @@ def connect_db(self, database_name=None):
if not sqlalchemy_utils.database_exists(self.url):
sqlalchemy_utils.create_database(self.url)
self.engine = sqlalchemy.create_engine(self.url, isolation_level='AUTOCOMMIT')
self.connection = self.engine.connect()

# Check if a database exists
def db_exists(self, database_name):
"""
:param database_name: [str] name of a database
:return: [bool]
"""
result = self.engine.execute("SELECT EXISTS("
"SELECT datname FROM pg_catalog.pg_database "
"WHERE datname='{}');".format(database_name))
return result.fetchone()[0]
self.connection = self.engine.raw_connection()

# An alternative to sqlalchemy_utils.create_database()
def create_db(self, database_name, verbose=False):
def create_database(self, database_name, verbose=False):
"""
:param database_name: [str] name of a database
:param verbose: [bool] (default: False)
"""
if not self.db_exists(database_name):
if not self.database_exists(database_name):
print("Creating a database \"{}\" ... ".format(database_name), end="") if verbose else ""
self.disconnect()
self.engine.execute('CREATE DATABASE "{}";'.format(database_name))
print("Done.") if verbose else ""
else:
print("The database already exists.") if verbose else ""
self.connect_db(database_name)
self.connect_database(database_name)

# Get size of a database
def get_db_size(self, database_name=None):
def get_database_size(self, database_name=None):
"""
:param database_name: [str; None (default)] name of a database; if None, the current connected database is used
:return: [str] size of the database
Expand All @@ -126,7 +129,7 @@ def disconnect(self, database_name=None, verbose=False):
db_name = self.database_name if database_name is None else database_name
print("Disconnecting the database \"{}\" ... ".format(db_name), end="") if verbose else ""
try:
self.connect_db(database_name='postgres')
self.connect_database(database_name='postgres')
self.engine.execute('REVOKE CONNECT ON DATABASE "{}" FROM PUBLIC, postgres;'.format(db_name))
self.engine.execute(
'SELECT pg_terminate_backend(pid) '
Expand All @@ -137,12 +140,12 @@ def disconnect(self, database_name=None, verbose=False):
print("Failed. CAUSE: \"{}\"".format(e))

# Kill connections to all other databases
def disconnect_all_other_dbs(self):
self.connect_db('postgres')
def disconnect_all_other_databases(self):
self.connect_database('postgres')
self.engine.execute('SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid();')

# Drop the specified database
def drop(self, database_name=None, confirmation_required=True, verbose=False):
def drop_database(self, database_name=None, confirmation_required=True, verbose=False):
"""
:param database_name: [str; None (default)] database to be disconnected; if None, to disconnect the current one
:param confirmation_required: [bool] (default: True)
Expand All @@ -159,6 +162,13 @@ def drop(self, database_name=None, confirmation_required=True, verbose=False):
except Exception as e:
print("Failed. CAUSE: \"{}\"".format(e))

# Check if a database exists
def schema_exists(self, schema_name):
result = self.engine.execute("SELECT EXISTS("
"SELECT schema_name FROM information_schema.schemata "
"WHERE schema_name='{}');".format(schema_name))
return result.fetchone()[0]

# Create a new schema in the database being currently connected
def create_schema(self, schema_name, verbose=False):
"""
Expand All @@ -172,8 +182,8 @@ def create_schema(self, schema_name, verbose=False):
except Exception as e:
print("Failed. CAUSE: \"{}\"".format(e))

# Formulate printing message for schemas
def multi_names_msg(self, *schema_names, desc='schema'):
# Formulate printing message for multiple names
def printing_messages_for_multi_names(self, *schema_names, desc='schema'):
"""
:param schema_names: [str; iterable]
:param desc: [str] (default: 'schema')
Expand All @@ -196,7 +206,7 @@ def drop_schema(self, *schema_names, confirmation_required=True, verbose=False):
:param confirmation_required: [bool] (default: True)
:param verbose: [bool] (default: False)
"""
schemas, schemas_msg = self.multi_names_msg(*schema_names)
schemas, schemas_msg = self.printing_messages_for_multi_names(*schema_names)
if confirmed("Confirmed to drop the {} from the database \"{}\"".format(schemas_msg, self.database_name),
confirmation_required=confirmation_required):
try:
Expand All @@ -219,74 +229,186 @@ def table_exists(self, table_name, schema_name='public'):
"AND table_name='{}');".format(schema_name, table_name))
return res.fetchone()[0]

# Create a new table
def create_table(self, table_name, column_specs, schema_name='public', verbose=False):
"""
:param table_name: [str] name of a table
:param schema_name: [str] name of a schema (default: 'public')
:param column_specs: [str; None (default)]
:param verbose: [bool] (default: False)
e.g.
CREATE TABLE table_name(
column1 datatype column1_constraint,
column2 datatype column2_constraint,
...
columnN datatype columnN_constraint,
PRIMARY KEY( one or more columns )
);
# column_specs = 'column_name TYPE column_constraint, ..., table_constraint table_constraint'
column_specs = 'col_name_1 INT, col_name_2 TEXT'
"""
table_name_ = '{schema}.\"{table}\"'.format(schema=schema_name, table=table_name)

if not self.schema_exists(schema_name):
self.create_schema(schema_name, verbose=False)

try:
print("Creating a table '{}' ... ".format(table_name_), end="") if verbose else ""
self.engine.execute('CREATE TABLE {} ({});'.format(table_name_, column_specs))
print("Done.") if verbose else ""
except Exception as e:
print("Failed. CAUSE: \"{}\"".format(e))

# Get information about columns
def get_column_info(self, table_name, schema_name='public', as_dict=True):
"""
:param table_name: [str] name of a table
:param schema_name: [str] name of a schema (default: 'public')
:param as_dict: [bool] (default: True)
:return: [pd.DataFrame; dict]
"""
column_info = self.engine.execute(
"SELECT * FROM information_schema.columns "
"WHERE table_schema='{}' AND table_name='{}';".format(schema_name, table_name))
keys, values = column_info.keys(), column_info.fetchall()
info_tbl = pd.DataFrame(values, index=['column_{}'.format(x) for x in range(len(values))], columns=keys).T
if as_dict:
info_tbl = {k: v.to_list() for k, v in info_tbl.iterrows()}
return info_tbl

# Remove data from the database being currently connected
def drop_table(self, table_name, schema_name='public', confirmation_required=True, verbose=False):
"""
:param table_name: [str] name of a table
:param schema_name: [str] name of a schema (default: 'public')
:param confirmation_required: [bool] (default: True)
:param verbose: [bool] (default: False)
"""
if confirmed("Confirmed to drop the table {}.\"{}\" from the database \"{}\"?".format(
schema_name, table_name, self.database_name), confirmation_required=confirmation_required):
try:
self.engine.execute('DROP TABLE IF EXISTS {}.\"{}\" CASCADE;'.format(schema_name, table_name))
print("The table \"{}\" has been dropped successfully.".format(table_name)) if verbose else ""
except Exception as e:
print("Failed. CAUSE: \"{}\"".format(e))

# A callable using PostgreSQL COPY clause for executing inserting data
@staticmethod
def psql_insert_copy(table, conn, keys, data_iter):
"""
:param table: [pandas.io.sql.SQLTable]
:param conn: [sqlalchemy.engine.Engine; sqlalchemy.engine.Connection]
:param keys: [list of str] column names
:param data_iter: iterable that iterates the values to be inserted
Source: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method
"""
cur = conn.connection.cursor()
s_buf = io.StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)

columns = ', '.join('"{}"'.format(k) for k in keys)
table_name = '{}."{}"'.format(table.schema, table.name)

sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)

# Import data (as a pandas.DataFrame) into the database being currently connected
def dump_data(self, data, table_name, schema_name='public', if_exists='replace', chunk_size=None,
force_replace=False, col_type=None, verbose=False):
def dump_data(self, data, table_name, schema_name='public', if_exists='replace', force_replace=False,
chunk_size=None, col_type=None, method='multi', verbose=False, **kwargs):
"""
:param data: [pd.DataFrame]
:param schema_name: [str]
:param table_name: [str] name of the targeted table
:param if_exists: [str] 'fail', 'replace' (default), 'append'
:param force_replace: [bool] (default: False)
:param col_type: [dict; None (default)]
:param chunk_size: [int; None (default)]
:param col_type: [dict; None (default)]
:param method: [None; str; callable] (default: 'multi' - pass multiple values in a single INSERT clause)
:param verbose: [bool] (default: False)
"""
if schema_name not in sqlalchemy.engine.reflection.Inspector.from_engine(self.engine).get_schema_names():
self.create_schema(schema_name, verbose=verbose)
# if not data.empty:
# There may be a need to change column types

table_name_ = '{}."{}"'.format(schema_name, table_name)

if self.table_exists(table_name, schema_name):
if if_exists == 'replace' and verbose:
print("The table {}.\"{}\" already exists and will be replaced ... ".format(schema_name, table_name))
print("The table '{}' already exists and will be replaced ... ".format(table_name_))
if force_replace:
if verbose:
print("The existing table {}.\"{}\" will be dropped first ... ".format(schema_name, table_name))
self.drop_table(table_name, verbose=verbose)
print("The existing table '{}' will be dropped first ... ".format(table_name_))
self.drop_table(table_name, schema_name, verbose=verbose)

try:
print("Dumping the data as a table \"{}\" into {}.\"{}\"@{} ... ".format(
table_name, schema_name, self.database_name, self.host), end="") if verbose else ""
if isinstance(data, pandas.io.parsers.TextFileReader):
for chunk in data:
chunk.to_sql(table_name, self.engine, schema_name, if_exists=if_exists, index=False, dtype=col_type)
chunk.to_sql(table_name, self.engine, schema_name, if_exists, index=False, dtype=col_type,
method=method, **kwargs)
else:
data.to_sql(table_name, self.engine, schema_name, if_exists=if_exists, index=False,
chunksize=chunk_size, dtype=col_type)
chunksize=chunk_size, dtype=col_type, method=method, **kwargs)
gc.collect()
print("Done.") if verbose else ""
except Exception as e:
print("Failed. CAUSE: \"{}\"".format(e))

# Read data and schema (geom type, e.g. points, lines, ...)
def read_table(self, table_name, schema_name='public', chunk_size=None, sorted_by=None):
# Read table data
def read_table(self, table_name, schema_name='public', condition=None, chunk_size=None, sorted_by=None, **kwargs):
"""
:param table_name: [str] name of a table
:param schema_name: [str] name of a schema
:param condition: [str; None (default)]
:param chunk_size: [int; None (default)] number of rows to include in each chunk
:param sorted_by: [str; None (default)]
:return: [pd.DataFrame]
"""
sql_query = 'SELECT * FROM {}."{}";'.format(schema_name, table_name)
table_data = pd.read_sql(sql=sql_query, con=self.engine, chunksize=chunk_size)
if condition:
assert isinstance(condition, str), "'condition' must be 'str' type."
sql_query = 'SELECT * FROM {}."{}" {};'.format(schema_name, table_name, condition)
else:
sql_query = 'SELECT * FROM {}."{}";'.format(schema_name, table_name)
table_data = pd.read_sql(sql_query, con=self.engine, chunksize=chunk_size, **kwargs)
if sorted_by and isinstance(sorted_by, str):
table_data.sort_values(sorted_by, inplace=True)
table_data.index = range(len(table_data))
return table_data

# Remove data from the database being currently connected
def drop_table(self, table_name, schema_name='public', confirmation_required=True, verbose=False):
# Read data by SQL query (recommended for large table)
def read_sql_query(self, sql_query, method='spooled_tempfile', mode='w+b', max_size_spooled=1, delimiter=',',
csv_dtype=None, **kwargs):
"""
:param table_name: [str] name of a table
:param schema_name: [str] name of a schema (default: 'public')
:param confirmation_required: [bool] (default: True)
:param verbose: [bool] (default: False)
:param sql_query: [str]
:param method: [str] {'spooled_tempfile', 'tempfile', 'stringio'} (default: 'spooled_tempfile')
:param mode: [str] (default: 'w+b')
:param max_size_spooled: [int] (default: 10000, in Gigabyte)
:param delimiter: [str]
:param csv_dtype: [dict; None (default)]
:return: [pd.DataFrame]
"""
if confirmed("Confirmed to drop the table {}.\"{}\" from the database \"{}\"?".format(
schema_name, table_name, self.database_name), confirmation_required=confirmation_required):
try:
self.engine.execute('DROP TABLE IF EXISTS {}.\"{}\" CASCADE;'.format(schema_name, table_name))
print("The table \"{}\" has been dropped successfully.".format(table_name)) if verbose else ""
except Exception as e:
print("Failed. CAUSE: \"{}\"".format(e))
if method == 'stringio': # Use io.StringIO
csv_temp = io.StringIO()
elif method == 'tempfile': # Use tempfile.TemporaryFile
csv_temp = tempfile.TemporaryFile(mode)
else: # Use tempfile.SpooledTemporaryFile - data would be spooled in memory until its size > max_spooled_size
csv_temp = tempfile.SpooledTemporaryFile(max_size_spooled * 10 ** 9, mode)

# Specify the SQL query for "COPY"
copy_sql = "COPY ({query}) TO STDOUT WITH DELIMITER '{delimiter}' CSV HEADER;".format(
query=sql_query, delimiter=delimiter)
# Get a cursor
cur = self.connection.cursor()
cur.copy_expert(copy_sql, csv_temp)
csv_temp.seek(0) # Rewind the file handle using seek() in order to read the data back from it
# Read data from temporary csv
table_data = pandas.read_csv(csv_temp, dtype=csv_dtype, **kwargs)
cur.close()

return table_data

0 comments on commit eac4bc4

Please sign in to comment.