Skip to content

Commit

Permalink
Merge pull request #8680 from mindsdb/psql-improvments
Browse files Browse the repository at this point in the history
PostgreSQL Integration improvments
  • Loading branch information
MinuraPunchihewa committed Jan 29, 2024
2 parents a71b684 + 508ab62 commit 4e66cb6
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 28 deletions.
1 change: 0 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,5 @@ exclude =
mindsdb/integrations/handlers/aurora_handler/*
mindsdb/integrations/handlers/surrealdb_handler/*
mindsdb/integrations/handlers/neuralforecast_handler/*
mindsdb/integrations/handlers/postgres_handler/*
mindsdb/integrations/handlers/rag_handler/*
max-complexity = 18
3 changes: 3 additions & 0 deletions .github/workflows/mindsdb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ jobs:
CHECK_FOR_UPDATES: False
OPENAI_API_KEY: ${{secrets.OPENAI_API_KEY}}
ANYSCALE_ENDPOINTS_API_KEY: ${{secrets.ANYSCALE_ENDPOINTS_API_KEY}}
- name: Run Integrations unittest
run: |
python -m unittest discover -s tests/unit/handlers
63 changes: 63 additions & 0 deletions mindsdb/integrations/handlers/postgres_handler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# PostgreSQL Integration

This documentation describes the integration of MindsDB with PostgreSQL, a powerful, open-source, object-relational database system. The integration allows for advanced SQL functionalities, extending PostgreSQL's capabilities with MindsDB's features.

## Getting Started

### Prerequisites

1. Ensure that MindsDB and PostgreSQL are installed on your system or you have access to cloud options.
2. If running locally install the dependencies as `pip install mindsdb[postgres]`.


### Connection

Use the following syntax to create a connection to the PostgreSQL database in MindsDB:

```sql
CREATE DATABASE psql_datasource
WITH ENGINE = 'postgres',
PARAMETERS = {
"host": "127.0.0.1",
"port": 5432,
"database": "postgres",
"user": "postgres",
"schema": "data",
"password": "password"
};
```

Required Parameters:

* `user`: The username for the PostgreSQL database.
* `password`: The password for the PostgreSQL database.
* `host`: The hostname, IP address, or URL of the PostgreSQL server.
* `port`: The port number for connecting to the PostgreSQL server.
* `database`: The name of the PostgreSQL database to connect to.

Optional Parameters:

* `schema`: The database schema to use. Default is `public`.
* `sslmode`: The SSL mode for the connection.

### Example Usage

Querying a Table:

```sql
SELECT * FROM psql_datasource.demo_data.used_car_price LIMIT 10;
```

Running native queries by wrapping them inside the postgresql integration SELECT:

```sql
SELECT * FROM psql_datasource (
--Native Query Goes Here
SELECT
model,
COUNT(*) OVER (PARTITION BY model, year) AS units_to_sell,
ROUND((CAST(tax AS decimal) / price), 3) AS tax_div_price
FROM demo_data.used_car_price
);
```
> Note: In the above examples we are using `psql_datasource` name, which was created with CREATE DATABASE query.
105 changes: 78 additions & 27 deletions mindsdb/integrations/handlers/postgres_handler/postgres_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

logger = log.getLogger(__name__)


class PostgresHandler(DatabaseHandler):
"""
This handler handles connection and execution of the PostgreSQL statements.
Expand All @@ -40,15 +41,21 @@ def __init__(self, name=None, **kwargs):
self.is_connected = False

def __del__(self):
if self.is_connected is True:
if self.is_connected:
self.disconnect()

@profiler.profile()
def connect(self):
"""
Handles the connection to a PostgreSQL database instance.
Establishes a connection to a PostgreSQL database.
Raises:
psycopg.Error: If an error occurs while connecting to the PostgreSQL database.
Returns:
psycopg.Connection: A connection object to the PostgreSQL database.
"""
if self.is_connected is True:
if self.is_connected:
return self.connection

config = {
Expand All @@ -62,48 +69,58 @@ def connect(self):
if self.connection_args.get('sslmode'):
config['sslmode'] = self.connection_args.get('sslmode')

# If schema is not provided set public as default one
if self.connection_args.get('schema'):
config['options'] = f'-c search_path={self.connection_args.get("schema")},public'

connection = psycopg.connect(**config, connect_timeout=10)

self.is_connected = True
self.connection = connection
return self.connection
try:
self.connection = psycopg.connect(**config, connect_timeout=10)
self.is_connected = True
return self.connection
except psycopg.Error as e:
logger.error(f'Error connecting to PostgreSQL {self.database}, {e}!')
self.is_connected = False
raise

def disconnect(self):
if self.is_connected is False:
"""
Closes the connection to the PostgreSQL database if it's currently open.
"""
if not self.is_connected:
return
self.connection.close()
self.is_connected = False

def check_connection(self) -> StatusResponse:
"""
Check the connection of the PostgreSQL database
:return: success status and error message if error occurs
Checks the status of the connection to the PostgreSQL database.
Returns:
StatusResponse: An object containing the success status and an error message if an error occurs.
"""
response = StatusResponse(False)
need_to_close = self.is_connected is False
need_to_close = not self.is_connected

try:
connection = self.connect()
with connection.cursor() as cur:
# Execute a simple query to test the connection
cur.execute('select 1;')
response.success = True
except psycopg.Error as e:
logger.error(f'Error connecting to PostgreSQL {self.database}, {e}!')
response.error_message = e
response.error_message = str(e)

if response.success is True and need_to_close:
if response.success and need_to_close:
self.disconnect()
if response.success is False and self.is_connected is True:
elif not response.success and self.is_connected:
self.is_connected = False

return response

def _cast_dtypes(self, df: DataFrame, description: list) -> None:
""" Cast df dtypes basing on postgres types
"""
Cast df dtypes basing on postgres types
Note:
Date types casting is not provided because of there is no issues (so far).
By default pandas will cast postgres date types to:
Expand All @@ -130,16 +147,24 @@ def _cast_dtypes(self, df: DataFrame, description: list) -> None:
if str(col.dtype) == 'object':
pg_type = types.get(description[column_index].type_code)
if pg_type is not None and pg_type.name in types_map:
df.iloc[:, column_index] = col.astype(types_map[pg_type.name])
col = col.fillna(0)
try:
df.iloc[:, column_index] = col.astype(types_map[pg_type.name])
except ValueError as e:
logger.error(f'Error casting column {col.name} to {types_map[pg_type.name]}: {e}')

@profiler.profile()
def native_query(self, query: str) -> Response:
"""
Receive SQL query and runs it
:param query: The SQL query to run in PostgreSQL
:return: returns the records from the current recordset
Executes a SQL query on the PostgreSQL database and returns the result.
Args:
query (str): The SQL query to be executed.
Returns:
Response: A response object containing the result of the query or an error message.
"""
need_to_close = self.is_connected is False
need_to_close = not self.is_connected

connection = self.connect()
with connection.cursor() as cur:
Expand All @@ -160,30 +185,39 @@ def native_query(self, query: str) -> Response:
)
connection.commit()
except Exception as e:
logger.error(f'Error running query: {query} on {self.database}!')
logger.error(f'Error running query: {query} on {self.database}, {e}!')
response = Response(
RESPONSE_TYPE.ERROR,
error_code=0,
error_message=str(e)
)
connection.rollback()

if need_to_close is True:
if need_to_close:
self.disconnect()

return response

@profiler.profile()
def query(self, query: ASTNode) -> Response:
"""
Retrieve the data from the SQL statement with eliminated rows that dont satisfy the WHERE condition
Executes a SQL query represented by an ASTNode and retrieves the data.
Args:
query (ASTNode): An ASTNode representing the SQL query to be executed.
Returns:
Response: The response from the `native_query` method, containing the result of the SQL query execution.
"""
query_str = self.renderer.get_string(query, with_failback=True)
logger.debug(f"Executing SQL query: {query_str}")
return self.native_query(query_str)

def get_tables(self) -> Response:
"""
List all tables in PostgreSQL without the system tables information_schema and pg_catalog
Retrieves a list of all non-system tables and views in the current schema of the PostgreSQL database.
Returns:
Response: A response object containing the list of tables and views, formatted as per the `Response` class.
"""
query = """
SELECT
Expand All @@ -200,6 +234,21 @@ def get_tables(self) -> Response:
return self.native_query(query)

def get_columns(self, table_name: str) -> Response:
"""
Retrieves column details for a specified table in the PostgreSQL database.
Args:
table_name (str): The name of the table for which to retrieve column information.
Returns:
Response: A response object containing the column details, formatted as per the `Response` class.
Raises:
ValueError: If the 'table_name' is not a valid string.
"""

if not table_name or not isinstance(table_name, str):
raise ValueError("Invalid table name provided.")

query = f"""
SELECT
column_name as "Field",
Expand All @@ -211,6 +260,7 @@ def get_columns(self, table_name: str) -> Response:
"""
return self.native_query(query)


connection_args = OrderedDict(
user={
'type': ARG_TYPE.STR,
Expand Down Expand Up @@ -260,6 +310,7 @@ def get_columns(self, table_name: str) -> Response:
host='127.0.0.1',
port=5432,
user='root',
schema='public',
password='password',
database='database'
)
Loading

0 comments on commit 4e66cb6

Please sign in to comment.