Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Presto adapter #27

Merged
merged 2 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/imports-linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
pip install .[redshift]
pip install .[postgres]
pip install .[trino]
pip install .[presto]
pip install .[pyspark]
pip install .[dbt]
pip install .[docker]
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/imports-macosx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
pip install .[redshift]
pip install .[postgres]
pip install .[trino]
pip install .[presto]
pip install .[pyspark]
pip install .[dbt]
pip install .[docker]
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/imports-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
pip install .[redshift]
pip install .[postgres]
pip install .[trino]
pip install .[presto]
pip install .[pyspark]
pip install .[dbt]
pip install .[docker]
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Adapters allow users to connect to data warehouses or analytics engines. Prism c
| **dbt** | ```pip install "prism-ds[dbt]"``` |
| **Google BigQuery** | ```pip install "prism-ds[bigquery]"``` |
| **Postgres** | ```pip install "prism-ds[postgres]"``` |
| **Presto** | ```pip install "prism-ds[presto]"``` |
| **PySpark** | ```pip install "prism-ds[pyspark]"``` |
| **Redshift** | ```pip install "prism-ds[redshift]"``` |
| **Snowflake** | ```pip install "prism-ds[snowflake]"``` |
Expand Down
1 change: 1 addition & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
-e .[bigquery]
-e .[redshift]
-e .[postgres]
-e .[presto]
-e .[trino]
-e .[pyspark]
-e .[dbt]
Expand Down
3 changes: 2 additions & 1 deletion prism/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#############

# Version number
VERSION = '0.2.1'
VERSION = '0.2.2'

# Root directory of project
ROOT_DIR = str(Path(os.path.dirname(__file__)).parent)
Expand All @@ -35,6 +35,7 @@
"redshift",
"snowflake",
"trino",
"presto",
]

# Task types
Expand Down
1 change: 1 addition & 0 deletions prism/infra/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def get_connection(self, adapter_name: str):
- Redshift --> psycopg2.Connection
- Snowflake --> snowflake.connector.Connection
- Trino --> trino.dbapi.Connection
- Presto --> prestodb.dbapi.Connection

args:
adapter_name: SQL adapter
Expand Down
186 changes: 186 additions & 0 deletions prism/profiles/presto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
"""
Presto adapter class definition

Table of Contents
- Imports
- Class definition
"""

###########
# Imports #
###########

# Standard library imports
import pandas as pd
from typing import Any, Dict, Optional
import prestodb

# Prism-specific imports
from .adapter import Adapter
import prism.exceptions


####################
# Class definition #
####################

class Presto(Adapter):

def is_valid_config(self,
config_dict: Dict[str, str],
adapter_name: str,
profile_name: str,
) -> bool:
"""
Check that config dictionary is profile YML is valid

args:
config_dict: config dictionary under PostgresQL adapter in profile YML
adapter_name: name assigned to adapter
profile_name: profile name containing adapter
returns:
boolean indicating whether config dictionary in profile YML is valid
"""

# Required config vars
required_config_vars = [
'type',
'user',
'password',
'port',
'host',
]

# Optional config vars
optional_config_vars = [
'http_scheme',
'catalog',
'schema',
]

# Raise an error if:
# 1. Config doesn't contain any of the required vars or contains additional
# config vars
# 2. Any of the config values are None
actual_config_vars = []
for k, v in config_dict.items():
if k not in required_config_vars and k not in optional_config_vars:
raise prism.exceptions.InvalidProfileException(
message=f'invalid var `{k}` - see `{adapter_name}` adapter in `{profile_name}` profile in profile YML' # noqa: E501
)
if k in required_config_vars:
actual_config_vars.append(k)
if v is None:
raise prism.exceptions.InvalidProfileException(
message=f'var `{k}` cannot be None - see `{adapter_name}` adapter in `{profile_name}` profile in profile YML' # noqa: E501
)
vars_not_defined = list(set(required_config_vars) - set(actual_config_vars))
if len(vars_not_defined) > 0:
v = vars_not_defined.pop()
raise prism.exceptions.InvalidProfileException(
message=f'var `{v}` must be defined - see `{adapter_name}` adapter in `{profile_name}` profile in profile YML' # noqa: E501
)

# If the schema is specified, then Trino requires that the catalog is also
# specified. However, the user can choose to only specify the catalog if they
# choose.
catalog_schema_specs = {}
for var in ['schema', 'catalog']:
catalog_schema_specs[var] = (
var in list(config_dict.keys())
and ( # noqa: W503
config_dict[var] is not None or config_dict[var] != ""
)
)
if catalog_schema_specs['schema'] and not catalog_schema_specs['catalog']:
raise prism.exceptions.InvalidProfileException(
message=f"`schema` is set but `catalog` is not in `{profile_name}` profile in profile YML" # noqa: E501
)

# If no exception has been raised, return True
return True

def create_engine(self,
adapter_dict: Dict[str, Any],
adapter_name: str,
profile_name: str
):
"""
Parse PostgresQL adapter, represented as a dict and return the PostgresQL
connector object

args:
adapter_dict: PostgresQL adapter represented as a dictionary
adapter_name: name assigned to adapter
profile_name: profile name containing adapter
returns:
PostgresQL connector object
"""

# Get configuration and check if config is valid
self.is_valid_config(adapter_dict, adapter_name, profile_name)

# Schema is present. Since this is a valid config dict, then we know that
# catalog must be present.
if 'schema' in list(adapter_dict.keys()):
conn = prestodb.dbapi.connect(
host=adapter_dict['host'],
port=adapter_dict['port'],
http_scheme=adapter_dict.get('http_schema', 'https'),
auth=prestodb.auth.BasicAuthentication(
adapter_dict['user'],
adapter_dict['password']
),
catalog=adapter_dict['catalog'],
schema=adapter_dict['schema']
)

# Just catalog is present
elif 'catalog' in list(adapter_dict.keys()):
conn = prestodb.dbapi.connect(
host=adapter_dict['host'],
port=adapter_dict['port'],
http_scheme=adapter_dict.get('http_schema', 'https'),
auth=prestodb.auth.BasicAuthentication(
adapter_dict['user'],
adapter_dict['password']
),
catalog=adapter_dict['catalog']
)

# Neither catalog nor schema is present
else:
conn = prestodb.dbapi.connect(
host=adapter_dict['host'],
port=adapter_dict['port'],
http_scheme=adapter_dict.get('http_schema', 'https'),
auth=prestodb.auth.BasicAuthentication(
adapter_dict['user'],
adapter_dict['password']
)
)

return conn

def execute_sql(self, query: str, return_type: Optional[str]) -> pd.DataFrame:
"""
Execute the SQL query
"""
# Create cursor for every SQL query -- this ensures thread safety
cursor = self.engine.cursor()
cursor.execute(query)
data = cursor.fetchall()

# If the return type is `pandas`, then return a DataFrame
if return_type == "pandas":
cols = []
for elts in cursor.description:
cols.append(elts[0])
df: pd.DataFrame = pd.DataFrame(data=data, columns=cols)
cursor.close()
return df

# Otherwise, return the data as it exists
else:
cursor.close()
return data
11 changes: 11 additions & 0 deletions prism/templates/profile/presto/profile.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
default:
adapters:
presto_adapter_name_here: # change this!
type: presto
host:
http_scheme: https
port:
user:
password:
catalog:
schema:
4 changes: 3 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = prism-ds
description = The easiest way to create data pipelines in Python.
long_description_content_type = text/markdown
long_description = file: README.md
version = 0.2.1
version = 0.2.2
author = prism founders
author_email = hello@runprism.com
license = Apache-2.0
Expand Down Expand Up @@ -56,6 +56,8 @@ postgres =
psycopg2-binary>=2.9
trino =
trino>=0.319
presto =
presto-python-client>=0.8
pyspark =
pyspark>=3
dbt =
Expand Down
Loading