diff --git a/.github/workflows/imports-linux.yml b/.github/workflows/imports-linux.yml index 0e7af4ab..9f147d55 100644 --- a/.github/workflows/imports-linux.yml +++ b/.github/workflows/imports-linux.yml @@ -29,6 +29,7 @@ jobs: pip install .[redshift] pip install .[postgres] pip install .[trino] + pip install .[presto] pip install .[pyspark] pip install .[dbt] pip install .[docker] diff --git a/.github/workflows/imports-macosx.yml b/.github/workflows/imports-macosx.yml index 5578f5cf..778a6ffb 100644 --- a/.github/workflows/imports-macosx.yml +++ b/.github/workflows/imports-macosx.yml @@ -29,6 +29,7 @@ jobs: pip install .[redshift] pip install .[postgres] pip install .[trino] + pip install .[presto] pip install .[pyspark] pip install .[dbt] pip install .[docker] diff --git a/.github/workflows/imports-windows.yml b/.github/workflows/imports-windows.yml index 2e441c8f..f8d90ed5 100644 --- a/.github/workflows/imports-windows.yml +++ b/.github/workflows/imports-windows.yml @@ -29,6 +29,7 @@ jobs: pip install .[redshift] pip install .[postgres] pip install .[trino] + pip install .[presto] pip install .[pyspark] pip install .[dbt] pip install .[docker] diff --git a/README.md b/README.md index e9c2060d..1f380da4 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ Adapters allow users to connect to data warehouses or analytics engines. Prism c | **dbt** | ```pip install "prism-ds[dbt]"``` | | **Google BigQuery** | ```pip install "prism-ds[bigquery]"``` | | **Postgres** | ```pip install "prism-ds[postgres]"``` | +| **Presto** | ```pip install "prism-ds[presto]"``` | | **PySpark** | ```pip install "prism-ds[pyspark]"``` | | **Redshift** | ```pip install "prism-ds[redshift]"``` | | **Snowflake** | ```pip install "prism-ds[snowflake]"``` | diff --git a/dev_requirements.txt b/dev_requirements.txt index b3c3c34d..c77de2a4 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -2,6 +2,7 @@ -e .[bigquery] -e .[redshift] -e .[postgres] +-e .[presto] -e .[trino] -e .[pyspark] -e .[dbt] diff --git a/prism/constants.py b/prism/constants.py index 61c7f664..5a6b3d0d 100644 --- a/prism/constants.py +++ b/prism/constants.py @@ -17,7 +17,7 @@ ############# # Version number -VERSION = '0.2.1' +VERSION = '0.2.2' # Root directory of project ROOT_DIR = str(Path(os.path.dirname(__file__)).parent) @@ -35,6 +35,7 @@ "redshift", "snowflake", "trino", + "presto", ] # Task types diff --git a/prism/infra/hooks.py b/prism/infra/hooks.py index 90784ac1..10622c56 100644 --- a/prism/infra/hooks.py +++ b/prism/infra/hooks.py @@ -42,6 +42,7 @@ def get_connection(self, adapter_name: str): - Redshift --> psycopg2.Connection - Snowflake --> snowflake.connector.Connection - Trino --> trino.dbapi.Connection + - Presto --> prestodb.dbapi.Connection args: adapter_name: SQL adapter diff --git a/prism/profiles/presto.py b/prism/profiles/presto.py new file mode 100644 index 00000000..b31ded6c --- /dev/null +++ b/prism/profiles/presto.py @@ -0,0 +1,186 @@ +""" +Presto adapter class definition + +Table of Contents +- Imports +- Class definition +""" + +########### +# Imports # +########### + +# Standard library imports +import pandas as pd +from typing import Any, Dict, Optional +import prestodb + +# Prism-specific imports +from .adapter import Adapter +import prism.exceptions + + +#################### +# Class definition # +#################### + +class Presto(Adapter): + + def is_valid_config(self, + config_dict: Dict[str, str], + adapter_name: str, + profile_name: str, + ) -> bool: + """ + Check that config dictionary is profile YML is valid + + args: + config_dict: config dictionary under PostgresQL adapter in profile YML + adapter_name: name assigned to adapter + profile_name: profile name containing adapter + returns: + boolean indicating whether config dictionary in profile YML is valid + """ + + # Required config vars + required_config_vars = [ + 'type', + 'user', + 'password', + 'port', + 'host', + ] + + # Optional config vars + optional_config_vars = [ + 'http_scheme', + 'catalog', + 'schema', + ] + + # Raise an error if: + # 1. Config doesn't contain any of the required vars or contains additional + # config vars + # 2. Any of the config values are None + actual_config_vars = [] + for k, v in config_dict.items(): + if k not in required_config_vars and k not in optional_config_vars: + raise prism.exceptions.InvalidProfileException( + message=f'invalid var `{k}` - see `{adapter_name}` adapter in `{profile_name}` profile in profile YML' # noqa: E501 + ) + if k in required_config_vars: + actual_config_vars.append(k) + if v is None: + raise prism.exceptions.InvalidProfileException( + message=f'var `{k}` cannot be None - see `{adapter_name}` adapter in `{profile_name}` profile in profile YML' # noqa: E501 + ) + vars_not_defined = list(set(required_config_vars) - set(actual_config_vars)) + if len(vars_not_defined) > 0: + v = vars_not_defined.pop() + raise prism.exceptions.InvalidProfileException( + message=f'var `{v}` must be defined - see `{adapter_name}` adapter in `{profile_name}` profile in profile YML' # noqa: E501 + ) + + # If the schema is specified, then Trino requires that the catalog is also + # specified. However, the user can choose to only specify the catalog if they + # choose. + catalog_schema_specs = {} + for var in ['schema', 'catalog']: + catalog_schema_specs[var] = ( + var in list(config_dict.keys()) + and ( # noqa: W503 + config_dict[var] is not None or config_dict[var] != "" + ) + ) + if catalog_schema_specs['schema'] and not catalog_schema_specs['catalog']: + raise prism.exceptions.InvalidProfileException( + message=f"`schema` is set but `catalog` is not in `{profile_name}` profile in profile YML" # noqa: E501 + ) + + # If no exception has been raised, return True + return True + + def create_engine(self, + adapter_dict: Dict[str, Any], + adapter_name: str, + profile_name: str + ): + """ + Parse PostgresQL adapter, represented as a dict and return the PostgresQL + connector object + + args: + adapter_dict: PostgresQL adapter represented as a dictionary + adapter_name: name assigned to adapter + profile_name: profile name containing adapter + returns: + PostgresQL connector object + """ + + # Get configuration and check if config is valid + self.is_valid_config(adapter_dict, adapter_name, profile_name) + + # Schema is present. Since this is a valid config dict, then we know that + # catalog must be present. + if 'schema' in list(adapter_dict.keys()): + conn = prestodb.dbapi.connect( + host=adapter_dict['host'], + port=adapter_dict['port'], + http_scheme=adapter_dict.get('http_schema', 'https'), + auth=prestodb.auth.BasicAuthentication( + adapter_dict['user'], + adapter_dict['password'] + ), + catalog=adapter_dict['catalog'], + schema=adapter_dict['schema'] + ) + + # Just catalog is present + elif 'catalog' in list(adapter_dict.keys()): + conn = prestodb.dbapi.connect( + host=adapter_dict['host'], + port=adapter_dict['port'], + http_scheme=adapter_dict.get('http_schema', 'https'), + auth=prestodb.auth.BasicAuthentication( + adapter_dict['user'], + adapter_dict['password'] + ), + catalog=adapter_dict['catalog'] + ) + + # Neither catalog nor schema is present + else: + conn = prestodb.dbapi.connect( + host=adapter_dict['host'], + port=adapter_dict['port'], + http_scheme=adapter_dict.get('http_schema', 'https'), + auth=prestodb.auth.BasicAuthentication( + adapter_dict['user'], + adapter_dict['password'] + ) + ) + + return conn + + def execute_sql(self, query: str, return_type: Optional[str]) -> pd.DataFrame: + """ + Execute the SQL query + """ + # Create cursor for every SQL query -- this ensures thread safety + cursor = self.engine.cursor() + cursor.execute(query) + data = cursor.fetchall() + + # If the return type is `pandas`, then return a DataFrame + if return_type == "pandas": + cols = [] + for elts in cursor.description: + cols.append(elts[0]) + df: pd.DataFrame = pd.DataFrame(data=data, columns=cols) + cursor.close() + return df + + # Otherwise, return the data as it exists + else: + cursor.close() + return data diff --git a/prism/templates/profile/presto/profile.yml b/prism/templates/profile/presto/profile.yml new file mode 100644 index 00000000..a0dc706f --- /dev/null +++ b/prism/templates/profile/presto/profile.yml @@ -0,0 +1,11 @@ +default: + adapters: + presto_adapter_name_here: # change this! + type: presto + host: + http_scheme: https + port: + user: + password: + catalog: + schema: diff --git a/setup.cfg b/setup.cfg index a070fd4c..458c345d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,7 +3,7 @@ name = prism-ds description = The easiest way to create data pipelines in Python. long_description_content_type = text/markdown long_description = file: README.md -version = 0.2.1 +version = 0.2.2 author = prism founders author_email = hello@runprism.com license = Apache-2.0 @@ -56,6 +56,8 @@ postgres = psycopg2-binary>=2.9 trino = trino>=0.319 +presto = + presto-python-client>=0.8 pyspark = pyspark>=3 dbt =