Skip to content

Commit

Permalink
Merge pull request #27 from runprism/presto-adapter
Browse files Browse the repository at this point in the history
Presto adapter
  • Loading branch information
prism-admin committed Aug 4, 2023
2 parents e01a740 + d44104e commit 73b2496
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 2 deletions.
1 change: 1 addition & 0 deletions .github/workflows/imports-linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
pip install .[redshift]
pip install .[postgres]
pip install .[trino]
pip install .[presto]
pip install .[pyspark]
pip install .[dbt]
pip install .[docker]
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/imports-macosx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
pip install .[redshift]
pip install .[postgres]
pip install .[trino]
pip install .[presto]
pip install .[pyspark]
pip install .[dbt]
pip install .[docker]
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/imports-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
pip install .[redshift]
pip install .[postgres]
pip install .[trino]
pip install .[presto]
pip install .[pyspark]
pip install .[dbt]
pip install .[docker]
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Adapters allow users to connect to data warehouses or analytics engines. Prism c
| **dbt** | ```pip install "prism-ds[dbt]"``` |
| **Google BigQuery** | ```pip install "prism-ds[bigquery]"``` |
| **Postgres** | ```pip install "prism-ds[postgres]"``` |
| **Presto** | ```pip install "prism-ds[presto]"``` |
| **PySpark** | ```pip install "prism-ds[pyspark]"``` |
| **Redshift** | ```pip install "prism-ds[redshift]"``` |
| **Snowflake** | ```pip install "prism-ds[snowflake]"``` |
Expand Down
1 change: 1 addition & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
-e .[bigquery]
-e .[redshift]
-e .[postgres]
-e .[presto]
-e .[trino]
-e .[pyspark]
-e .[dbt]
Expand Down
3 changes: 2 additions & 1 deletion prism/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#############

# Version number
VERSION = '0.2.1'
VERSION = '0.2.2'

# Root directory of project
ROOT_DIR = str(Path(os.path.dirname(__file__)).parent)
Expand All @@ -35,6 +35,7 @@
"redshift",
"snowflake",
"trino",
"presto",
]

# Task types
Expand Down
1 change: 1 addition & 0 deletions prism/infra/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def get_connection(self, adapter_name: str):
- Redshift --> psycopg2.Connection
- Snowflake --> snowflake.connector.Connection
- Trino --> trino.dbapi.Connection
- Presto --> prestodb.dbapi.Connection
args:
adapter_name: SQL adapter
Expand Down
186 changes: 186 additions & 0 deletions prism/profiles/presto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
"""
Presto adapter class definition
Table of Contents
- Imports
- Class definition
"""

###########
# Imports #
###########

# Standard library imports
import pandas as pd
from typing import Any, Dict, Optional
import prestodb

# Prism-specific imports
from .adapter import Adapter
import prism.exceptions


####################
# Class definition #
####################

class Presto(Adapter):

def is_valid_config(self,
config_dict: Dict[str, str],
adapter_name: str,
profile_name: str,
) -> bool:
"""
Check that config dictionary is profile YML is valid
args:
config_dict: config dictionary under PostgresQL adapter in profile YML
adapter_name: name assigned to adapter
profile_name: profile name containing adapter
returns:
boolean indicating whether config dictionary in profile YML is valid
"""

# Required config vars
required_config_vars = [
'type',
'user',
'password',
'port',
'host',
]

# Optional config vars
optional_config_vars = [
'http_scheme',
'catalog',
'schema',
]

# Raise an error if:
# 1. Config doesn't contain any of the required vars or contains additional
# config vars
# 2. Any of the config values are None
actual_config_vars = []
for k, v in config_dict.items():
if k not in required_config_vars and k not in optional_config_vars:
raise prism.exceptions.InvalidProfileException(
message=f'invalid var `{k}` - see `{adapter_name}` adapter in `{profile_name}` profile in profile YML' # noqa: E501
)
if k in required_config_vars:
actual_config_vars.append(k)
if v is None:
raise prism.exceptions.InvalidProfileException(
message=f'var `{k}` cannot be None - see `{adapter_name}` adapter in `{profile_name}` profile in profile YML' # noqa: E501
)
vars_not_defined = list(set(required_config_vars) - set(actual_config_vars))
if len(vars_not_defined) > 0:
v = vars_not_defined.pop()
raise prism.exceptions.InvalidProfileException(
message=f'var `{v}` must be defined - see `{adapter_name}` adapter in `{profile_name}` profile in profile YML' # noqa: E501
)

# If the schema is specified, then Trino requires that the catalog is also
# specified. However, the user can choose to only specify the catalog if they
# choose.
catalog_schema_specs = {}
for var in ['schema', 'catalog']:
catalog_schema_specs[var] = (
var in list(config_dict.keys())
and ( # noqa: W503
config_dict[var] is not None or config_dict[var] != ""
)
)
if catalog_schema_specs['schema'] and not catalog_schema_specs['catalog']:
raise prism.exceptions.InvalidProfileException(
message=f"`schema` is set but `catalog` is not in `{profile_name}` profile in profile YML" # noqa: E501
)

# If no exception has been raised, return True
return True

def create_engine(self,
adapter_dict: Dict[str, Any],
adapter_name: str,
profile_name: str
):
"""
Parse PostgresQL adapter, represented as a dict and return the PostgresQL
connector object
args:
adapter_dict: PostgresQL adapter represented as a dictionary
adapter_name: name assigned to adapter
profile_name: profile name containing adapter
returns:
PostgresQL connector object
"""

# Get configuration and check if config is valid
self.is_valid_config(adapter_dict, adapter_name, profile_name)

# Schema is present. Since this is a valid config dict, then we know that
# catalog must be present.
if 'schema' in list(adapter_dict.keys()):
conn = prestodb.dbapi.connect(
host=adapter_dict['host'],
port=adapter_dict['port'],
http_scheme=adapter_dict.get('http_schema', 'https'),
auth=prestodb.auth.BasicAuthentication(
adapter_dict['user'],
adapter_dict['password']
),
catalog=adapter_dict['catalog'],
schema=adapter_dict['schema']
)

# Just catalog is present
elif 'catalog' in list(adapter_dict.keys()):
conn = prestodb.dbapi.connect(
host=adapter_dict['host'],
port=adapter_dict['port'],
http_scheme=adapter_dict.get('http_schema', 'https'),
auth=prestodb.auth.BasicAuthentication(
adapter_dict['user'],
adapter_dict['password']
),
catalog=adapter_dict['catalog']
)

# Neither catalog nor schema is present
else:
conn = prestodb.dbapi.connect(
host=adapter_dict['host'],
port=adapter_dict['port'],
http_scheme=adapter_dict.get('http_schema', 'https'),
auth=prestodb.auth.BasicAuthentication(
adapter_dict['user'],
adapter_dict['password']
)
)

return conn

def execute_sql(self, query: str, return_type: Optional[str]) -> pd.DataFrame:
"""
Execute the SQL query
"""
# Create cursor for every SQL query -- this ensures thread safety
cursor = self.engine.cursor()
cursor.execute(query)
data = cursor.fetchall()

# If the return type is `pandas`, then return a DataFrame
if return_type == "pandas":
cols = []
for elts in cursor.description:
cols.append(elts[0])
df: pd.DataFrame = pd.DataFrame(data=data, columns=cols)
cursor.close()
return df

# Otherwise, return the data as it exists
else:
cursor.close()
return data
11 changes: 11 additions & 0 deletions prism/templates/profile/presto/profile.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
default:
adapters:
presto_adapter_name_here: # change this!
type: presto
host:
http_scheme: https
port:
user:
password:
catalog:
schema:
4 changes: 3 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = prism-ds
description = The easiest way to create data pipelines in Python.
long_description_content_type = text/markdown
long_description = file: README.md
version = 0.2.1
version = 0.2.2
author = prism founders
author_email = hello@runprism.com
license = Apache-2.0
Expand Down Expand Up @@ -56,6 +56,8 @@ postgres =
psycopg2-binary>=2.9
trino =
trino>=0.319
presto =
presto-python-client>=0.8
pyspark =
pyspark>=3
dbt =
Expand Down

0 comments on commit 73b2496

Please sign in to comment.