Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: s3 table upload location #1376

Merged
merged 9 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs_website/docs/changelog/breaking_change.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,22 @@ slug: /changelog

Here are the list of breaking changes that you should be aware of when updating Querybook:

## v3.29.0

Made below changes for `S3BaseExporter` (csv table uploader feature):

- Both `s3_path` and `use_schema_location` are optional now
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to mention if use_schema_location=True before 3.29.0, it would create managed table, now it creates external

- If none is provided, or `use_schema_location=False`, the table will be created as managed table, whose location will be determined by the query engine.
- Previously `use_schema_location=True` will create managed table, and now it creates external table.

## v3.27.0

Updated properties of `QueryValidationResult` object. `line` and `ch` are replaced with `start_line` and `start_ch` respectively.

## v3.22.0

Updated the charset of table `data_element` to `utf8mb4`. For those mysql db's default charset is not utf8, please run below sql to update it if needed.

```sql
ALTER TABLE data_element CONVERT TO CHARACTER SET utf8mb4
```
Expand Down
9 changes: 5 additions & 4 deletions docs_website/docs/integrations/add_table_upload.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@ Included by default: No

Available options:

Either s3_path or use_schema_location must be supplied.

- s3_path (str): if supplied, will use it as the root path for upload. Must be the full s3 path like s3://bucket/key, the trailing / is optional.
- use_schema_location (boolean):
- s3_path (str, optional): if supplied, will use it as the root path for upload. Must be the full s3 path like s3://bucket/key, the trailing / is optional.
- use_schema_location (boolean, optional):
if true, the upload root path is inferred by locationUri specified by the schema/database in HMS. To use this option, the engine must be connected to a metastore that uses
HMSMetastoreLoader (or its derived class).
if false, it will be created as managed table, whose location will be determined automatically by the query engine.
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
- table_properties (List[str]): list of table properties passed, this must be query engine specific.
Checkout here for examples in SparkSQL: https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-hiveformat.html#examples
For Trino/Presto, it would be the WITH statement: https://trino.io/docs/current/sql/create-table.html

If neither s3_path nor use_schema_location is supplied, it will be treated same as `use_schema_location=False`, and it will be created as managed table.

### S3 Parquet exporter

This would upload a Parquet file instead of a CSV file. In addition to dependencies such as boto3, `pyarrow` must also be installed.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Union

from clients.hms_client import HiveMetastoreClient
from const.metastore import DataColumn, DataTable
Expand Down Expand Up @@ -101,6 +101,12 @@ def get_partitions(
def get_schema_location(self, schema_name: str) -> str:
return self.hmc.get_database(schema_name).locationUri

def get_table_location(self, schema_name: str, table_name: str) -> Union[None, str]:
try:
return self.hmc.get_table(schema_name, table_name).sd.location
except NoSuchObjectException:
return None

def _get_hmc(self, metastore_dict):
return HiveMetastoreClient(
hmss_ro_addrs=metastore_dict["metastore_params"]["hms_connection"]
Expand Down
59 changes: 32 additions & 27 deletions querybook/server/lib/table_upload/exporter/s3_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
if true, the upload root path is inferred by locationUri specified in hms
to use this option, the engine must be connected to a metastore that uses
HMSMetastoreLoader (or its derived class)
if false, it will be created as managed table, whose location will be determined automatically by the query engine.
- table_properties (List[str]): list of table properties passed, this must be query engine specific.
Checkout here for examples in SparkSQL: https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-hiveformat.html#examples
For Trino/Presto, it would be the WITH statement: https://trino.io/docs/current/sql/create-table.html

If neither s3_path nor use_schema_location is provided, it will be treated same as `use_schema_location=False``,
and it will be created as managed table.
"""


Expand All @@ -61,40 +65,39 @@ def destination_s3_path(self, session=None) -> str:
return self.destination_s3_folder(session=session) + "/" + "0000"

@with_session
def destination_s3_root(self, session=None) -> str:
"""Generate the bucket name + prefix before
the table specific folder
def destination_s3_folder(self, session=None) -> str:
"""Generate the s3 folder path for the table

Returns:
str: s3 path consisting bucket + prefix + schema name
str: s3 path consisting bucket + prefix + schema name + table name
"""

schema_name, table_name = self._fq_table_name
if "s3_path" in self._exporter_config:
schema_name, _ = self._fq_table_name
s3_path: str = self._exporter_config["s3_path"]
return sanitize_s3_url(s3_path) + "/" + schema_name + "/" + table_name

return sanitize_s3_url_with_trailing_slash(s3_path) + schema_name + "/"

if self._exporter_config.get("use_schema_location", False):
query_engine = get_query_engine_by_id(self._engine_id, session=session)
metastore = get_metastore_loader(query_engine.metastore_id, session=session)
query_engine = get_query_engine_by_id(self._engine_id, session=session)
metastore = get_metastore_loader(query_engine.metastore_id, session=session)

if metastore is None:
raise Exception("Invalid metastore")
if metastore is None:
raise Exception("Invalid metastore for table upload")

schema_location_uri = metastore.get_schema_location(
self._table_config["schema_name"]
)
if self._exporter_config.get("use_schema_location", False):
schema_location_uri = metastore.get_schema_location(schema_name)
if not schema_location_uri:
raise Exception("Invalid metastore to use use_schema_location option")

return sanitize_s3_url_with_trailing_slash(schema_location_uri)
return sanitize_s3_url(schema_location_uri) + "/" + table_name

raise Exception("Must specify s3_path or set use_schema_location=True")
# Use its actual location for managed tables
table_location = metastore.get_table_location(schema_name, table_name)

@with_session
def destination_s3_folder(self, session=None) -> str:
_, table_name = self._fq_table_name
return self.destination_s3_root(session=session) + table_name
if not table_location:
raise Exception(
"Cant get the table location from metastore. Please make sure the query engine supports managed table with default location."
)
return sanitize_s3_url(table_location)

@with_session
def _handle_if_table_exists(self, session=None):
Expand All @@ -118,13 +121,15 @@ def _handle_if_table_exists(self, session=None):
def _get_table_create_query(self, session=None) -> str:
query_engine = get_query_engine_by_id(self._engine_id, session=session)
schema_name, table_name = self._fq_table_name
is_external = not self._exporter_config.get("use_schema_location", False)
is_external = "s3_path" in self._exporter_config or self._exporter_config.get(
"use_schema_location"
)
return get_create_table_statement(
language=query_engine.language,
table_name=table_name,
schema_name=schema_name,
column_name_types=self._table_config["column_name_types"],
# if use schema location, then no table location is needed for creation
# table location is only needed for external (non managed) table creation
file_location=self.destination_s3_folder() if is_external else None,
file_format=self.UPLOAD_FILE_TYPE(),
table_properties=self._exporter_config.get("table_properties", []),
Expand Down Expand Up @@ -203,13 +208,13 @@ def _upload_to_s3(self) -> None:
S3FileCopier.from_local_file(f).copy_to(self.destination_s3_path())


def sanitize_s3_url_with_trailing_slash(uri: str) -> str:
def sanitize_s3_url(uri: str) -> str:
"""
This function does two things:
1. if the uri is s3a:// or s3n://, change it to s3://
2. if there is no trailing slash, add it
2. remove the trailing slash if it has one
"""
uri = re.sub(r"^s3[a-z]:", "s3:", uri)
if not uri.endswith("/"):
uri += "/"
if uri.endswith("/"):
uri = uri[:-1]
return uri
10 changes: 9 additions & 1 deletion querybook/webapp/components/TableUploader/TableUploaderForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,15 @@ export const TableUploaderForm: React.FC<ITableUploaderFormProps> = ({
error: 'Fail to create table',
});

navigateWithinEnv(`/table/${tableId}`);
// sometimes there will be sync delay between the metastore and querybook
// skip the redirection if the table has not been synced over.
if (tableId) {
navigateWithinEnv(`/table/${tableId}`);
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
} else {
toast(
'Waiting for the table to be synced over from the metastore.'
);
}
onHide();
},
[onHide]
Expand Down