Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: s3 table upload location #1376

Merged
merged 9 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs_website/docs/integrations/add_table_upload.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Either s3_path or use_schema_location must be supplied.
- use_schema_location (boolean):
if true, the upload root path is inferred by locationUri specified by the schema/database in HMS. To use this option, the engine must be connected to a metastore that uses
HMSMetastoreLoader (or its derived class).
if false, it will be created as managed table, whose location will be determined automatically by the query engine.
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
- table_properties (List[str]): list of table properties passed, this must be query engine specific.
Checkout here for examples in SparkSQL: https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-hiveformat.html#examples
For Trino/Presto, it would be the WITH statement: https://trino.io/docs/current/sql/create-table.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ def get_partitions(
def get_schema_location(self, schema_name: str) -> str:
return self.hmc.get_database(schema_name).locationUri

def get_table_location(self, schema_name: str, table_name: str) -> str:
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
try:
return self.hmc.get_table(schema_name, table_name).sd.location
except NoSuchObjectException:
return None

def _get_hmc(self, metastore_dict):
return HiveMetastoreClient(
hmss_ro_addrs=metastore_dict["metastore_params"]["hms_connection"]
Expand Down
63 changes: 35 additions & 28 deletions querybook/server/lib/table_upload/exporter/s3_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@
if true, the upload root path is inferred by locationUri specified in hms
to use this option, the engine must be connected to a metastore that uses
HMSMetastoreLoader (or its derived class)
if false, it will be created as managed table, whose location will be determined automatically by the query engine.
- table_properties (List[str]): list of table properties passed, this must be query engine specific.
Checkout here for examples in SparkSQL: https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-hiveformat.html#examples
For Trino/Presto, it would be the WITH statement: https://trino.io/docs/current/sql/create-table.html
"""


class S3BaseExporter(BaseTableUploadExporter):
def __init__(self, exporter_config: dict = {}):
if ("s3_path" not in exporter_config) and (
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
"use_schema_location" not in exporter_config
):
raise Exception("Either s3_path or use_schema_location must be specified")
super().__init__(exporter_config)

@abstractmethod
def UPLOAD_FILE_TYPE(cls) -> str:
"""Override this to specify what kind of file is getting uploaded
Expand All @@ -61,40 +69,39 @@ def destination_s3_path(self, session=None) -> str:
return self.destination_s3_folder(session=session) + "/" + "0000"

@with_session
def destination_s3_root(self, session=None) -> str:
"""Generate the bucket name + prefix before
the table specific folder
def destination_s3_folder(self, session=None) -> str:
"""Generate the s3 folder path for the table

Returns:
str: s3 path consisting bucket + prefix + schema name
str: s3 path consisting bucket + prefix + schema name + table name
"""

schema_name, table_name = self._fq_table_name
if "s3_path" in self._exporter_config:
schema_name, _ = self._fq_table_name
s3_path: str = self._exporter_config["s3_path"]
return sanitize_s3_url(s3_path) + "/" + schema_name + "/" + table_name

return sanitize_s3_url_with_trailing_slash(s3_path) + schema_name + "/"

if self._exporter_config.get("use_schema_location", False):
query_engine = get_query_engine_by_id(self._engine_id, session=session)
metastore = get_metastore_loader(query_engine.metastore_id, session=session)
query_engine = get_query_engine_by_id(self._engine_id, session=session)
metastore = get_metastore_loader(query_engine.metastore_id, session=session)

if metastore is None:
raise Exception("Invalid metastore")
if metastore is None:
raise Exception("Invalid metastore")
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved

schema_location_uri = metastore.get_schema_location(
self._table_config["schema_name"]
)
if self._exporter_config.get("use_schema_location", False):
schema_location_uri = metastore.get_schema_location(schema_name)
if not schema_location_uri:
raise Exception("Invalid metastore to use use_schema_location option")

return sanitize_s3_url_with_trailing_slash(schema_location_uri)
return sanitize_s3_url(schema_location_uri) + "/" + table_name

raise Exception("Must specify s3_path or set use_schema_location=True")
# Use its actual location for managed tables
table_location = metastore.get_table_location(schema_name, table_name)
if table_location:
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
return sanitize_s3_url(table_location)

@with_session
def destination_s3_folder(self, session=None) -> str:
_, table_name = self._fq_table_name
return self.destination_s3_root(session=session) + table_name
raise Exception(
"Cant get the table location from metastore. Please make sure the query engine supports managed table with default location."
)

@with_session
def _handle_if_table_exists(self, session=None):
Expand All @@ -118,14 +125,14 @@ def _handle_if_table_exists(self, session=None):
def _get_table_create_query(self, session=None) -> str:
query_engine = get_query_engine_by_id(self._engine_id, session=session)
schema_name, table_name = self._fq_table_name
is_external = not self._exporter_config.get("use_schema_location", False)
is_managed = self._exporter_config.get("use_schema_location") is False
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
return get_create_table_statement(
language=query_engine.language,
table_name=table_name,
schema_name=schema_name,
column_name_types=self._table_config["column_name_types"],
# if use schema location, then no table location is needed for creation
file_location=self.destination_s3_folder() if is_external else None,
# table location is not needed for managed (non external) table creation
file_location=None if is_managed else self.destination_s3_folder(),
file_format=self.UPLOAD_FILE_TYPE(),
table_properties=self._exporter_config.get("table_properties", []),
)
Expand Down Expand Up @@ -203,13 +210,13 @@ def _upload_to_s3(self) -> None:
S3FileCopier.from_local_file(f).copy_to(self.destination_s3_path())


def sanitize_s3_url_with_trailing_slash(uri: str) -> str:
def sanitize_s3_url(uri: str) -> str:
"""
This function does two things:
1. if the uri is s3a:// or s3n://, change it to s3://
2. if there is no trailing slash, add it
2. remove the trailing slash if it has one
"""
uri = re.sub(r"^s3[a-z]:", "s3:", uri)
if not uri.endswith("/"):
uri += "/"
if uri.endswith("/"):
uri = uri[:-1]
return uri
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ export const TableUploaderForm: React.FC<ITableUploaderFormProps> = ({
error: 'Fail to create table',
});

navigateWithinEnv(`/table/${tableId}`);
// sometimes there will be sync delay between the metastore and querybook
// skip the redirect if the table has not been synced over.
if (tableId) {
navigateWithinEnv(`/table/${tableId}`);
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
}
onHide();
},
[onHide]
Expand Down