Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostgreSQL Integration improvments #8680

Merged
merged 9 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,5 @@ exclude =
mindsdb/integrations/handlers/aurora_handler/*
mindsdb/integrations/handlers/surrealdb_handler/*
mindsdb/integrations/handlers/neuralforecast_handler/*
mindsdb/integrations/handlers/postgres_handler/*
mindsdb/integrations/handlers/rag_handler/*
max-complexity = 18
3 changes: 3 additions & 0 deletions .github/workflows/mindsdb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ jobs:
CHECK_FOR_UPDATES: False
OPENAI_API_KEY: ${{secrets.OPENAI_API_KEY}}
ANYSCALE_ENDPOINTS_API_KEY: ${{secrets.ANYSCALE_ENDPOINTS_API_KEY}}
- name: Run Integrations unittest
run: |
python -m unittest discover -s tests/unit/handlers
63 changes: 63 additions & 0 deletions mindsdb/integrations/handlers/postgres_handler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# PostgreSQL Integration

This documentation describes the integration of MindsDB with PostgreSQL, a powerful, open-source, object-relational database system. The integration allows for advanced SQL functionalities, extending PostgreSQL's capabilities with MindsDB's features.

## Getting Started

### Prerequisites

1. Ensure that MindsDB and PostgreSQL are installed on your system or you have access to cloud options.
2. If running locally install the dependencies as `pip install mindsdb[postgres]`.


### Connection

Use the following syntax to create a connection to the PostgreSQL database in MindsDB:

```sql
CREATE DATABASE psql_datasource
WITH ENGINE = 'postgres',
PARAMETERS = {
"host": "127.0.0.1",
"port": 5432,
"database": "postgres",
"user": "postgres",
"schema": "data",
"password": "password"
};
```

Required Parameters:

* `user`: The username for the PostgreSQL database.
* `password`: The password for the PostgreSQL database.
* `host`: The hostname, IP address, or URL of the PostgreSQL server.
* `port`: The port number for connecting to the PostgreSQL server.
* `database`: The name of the PostgreSQL database to connect to.

Optional Parameters:

* `schema`: The database schema to use. Default is `public`.
* `sslmode`: The SSL mode for the connection.

### Example Usage

Querying a Table:

```sql
SELECT * FROM psql_datasource.demo_data.used_car_price LIMIT 10;
```

Running native queries by wrapping them inside the postgresql integration SELECT:

```sql
SELECT * FROM psql_datasource (
--Native Query Goes Here
SELECT
model,
COUNT(*) OVER (PARTITION BY model, year) AS units_to_sell,
ROUND((CAST(tax AS decimal) / price), 3) AS tax_div_price
FROM demo_data.used_car_price
);
```
> Note: In the above examples we are using `psql_datasource` name, which was created with CREATE DATABASE query.
105 changes: 78 additions & 27 deletions mindsdb/integrations/handlers/postgres_handler/postgres_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

logger = log.getLogger(__name__)


class PostgresHandler(DatabaseHandler):
"""
This handler handles connection and execution of the PostgreSQL statements.
Expand All @@ -40,15 +41,21 @@ def __init__(self, name=None, **kwargs):
self.is_connected = False

def __del__(self):
if self.is_connected is True:
if self.is_connected:
self.disconnect()

@profiler.profile()
def connect(self):
"""
Handles the connection to a PostgreSQL database instance.
Establishes a connection to a PostgreSQL database.

Raises:
psycopg.Error: If an error occurs while connecting to the PostgreSQL database.

Returns:
psycopg.Connection: A connection object to the PostgreSQL database.
"""
if self.is_connected is True:
if self.is_connected:
return self.connection

config = {
Expand All @@ -62,48 +69,58 @@ def connect(self):
if self.connection_args.get('sslmode'):
config['sslmode'] = self.connection_args.get('sslmode')

# If schema is not provided set public as default one
if self.connection_args.get('schema'):
config['options'] = f'-c search_path={self.connection_args.get("schema")},public'

connection = psycopg.connect(**config, connect_timeout=10)

self.is_connected = True
self.connection = connection
return self.connection
try:
self.connection = psycopg.connect(**config, connect_timeout=10)
self.is_connected = True
return self.connection
except psycopg.Error as e:
logger.error(f'Error connecting to PostgreSQL {self.database}, {e}!')
self.is_connected = False
raise

def disconnect(self):
if self.is_connected is False:
"""
Closes the connection to the PostgreSQL database if it's currently open.
"""
if not self.is_connected:
return
self.connection.close()
self.is_connected = False

def check_connection(self) -> StatusResponse:
"""
Check the connection of the PostgreSQL database
:return: success status and error message if error occurs
Checks the status of the connection to the PostgreSQL database.

Returns:
StatusResponse: An object containing the success status and an error message if an error occurs.
"""
response = StatusResponse(False)
need_to_close = self.is_connected is False
need_to_close = not self.is_connected

try:
connection = self.connect()
with connection.cursor() as cur:
# Execute a simple query to test the connection
cur.execute('select 1;')
response.success = True
except psycopg.Error as e:
logger.error(f'Error connecting to PostgreSQL {self.database}, {e}!')
response.error_message = e
response.error_message = str(e)

if response.success is True and need_to_close:
if response.success and need_to_close:
self.disconnect()
if response.success is False and self.is_connected is True:
elif not response.success and self.is_connected:
self.is_connected = False

return response

def _cast_dtypes(self, df: DataFrame, description: list) -> None:
""" Cast df dtypes basing on postgres types

"""
Cast df dtypes basing on postgres types
Note:
Date types casting is not provided because of there is no issues (so far).
By default pandas will cast postgres date types to:
Expand All @@ -130,16 +147,24 @@ def _cast_dtypes(self, df: DataFrame, description: list) -> None:
if str(col.dtype) == 'object':
pg_type = types.get(description[column_index].type_code)
if pg_type is not None and pg_type.name in types_map:
df.iloc[:, column_index] = col.astype(types_map[pg_type.name])
col = col.fillna(0)
try:
df.iloc[:, column_index] = col.astype(types_map[pg_type.name])
except ValueError as e:
logger.error(f'Error casting column {col.name} to {types_map[pg_type.name]}: {e}')

@profiler.profile()
def native_query(self, query: str) -> Response:
"""
Receive SQL query and runs it
:param query: The SQL query to run in PostgreSQL
:return: returns the records from the current recordset
Executes a SQL query on the PostgreSQL database and returns the result.

Args:
query (str): The SQL query to be executed.

Returns:
Response: A response object containing the result of the query or an error message.
"""
need_to_close = self.is_connected is False
need_to_close = not self.is_connected

connection = self.connect()
with connection.cursor() as cur:
Expand All @@ -160,30 +185,39 @@ def native_query(self, query: str) -> Response:
)
connection.commit()
except Exception as e:
logger.error(f'Error running query: {query} on {self.database}!')
logger.error(f'Error running query: {query} on {self.database}, {e}!')
response = Response(
RESPONSE_TYPE.ERROR,
error_code=0,
error_message=str(e)
)
connection.rollback()

if need_to_close is True:
if need_to_close:
self.disconnect()

return response

@profiler.profile()
def query(self, query: ASTNode) -> Response:
"""
Retrieve the data from the SQL statement with eliminated rows that dont satisfy the WHERE condition
Executes a SQL query represented by an ASTNode and retrieves the data.

Args:
query (ASTNode): An ASTNode representing the SQL query to be executed.

Returns:
Response: The response from the `native_query` method, containing the result of the SQL query execution.
"""
query_str = self.renderer.get_string(query, with_failback=True)
logger.debug(f"Executing SQL query: {query_str}")
return self.native_query(query_str)

def get_tables(self) -> Response:
"""
List all tables in PostgreSQL without the system tables information_schema and pg_catalog
Retrieves a list of all non-system tables and views in the current schema of the PostgreSQL database.

Returns:
Response: A response object containing the list of tables and views, formatted as per the `Response` class.
"""
query = """
SELECT
Expand All @@ -200,6 +234,21 @@ def get_tables(self) -> Response:
return self.native_query(query)

def get_columns(self, table_name: str) -> Response:
"""
Retrieves column details for a specified table in the PostgreSQL database.

Args:
table_name (str): The name of the table for which to retrieve column information.

Returns:
Response: A response object containing the column details, formatted as per the `Response` class.
Raises:
ValueError: If the 'table_name' is not a valid string.
"""

if not table_name or not isinstance(table_name, str):
raise ValueError("Invalid table name provided.")

query = f"""
SELECT
column_name as "Field",
Expand All @@ -211,6 +260,7 @@ def get_columns(self, table_name: str) -> Response:
"""
return self.native_query(query)


connection_args = OrderedDict(
user={
'type': ARG_TYPE.STR,
Expand Down Expand Up @@ -260,6 +310,7 @@ def get_columns(self, table_name: str) -> Response:
host='127.0.0.1',
port=5432,
user='root',
schema='public',
password='password',
database='database'
)
Loading
Loading