Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Add databricks table / SQL reader #39852

Merged
merged 18 commits into from
Oct 6, 2023

Conversation

WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Sep 26, 2023

Why are these changes needed?

Databricks users request a ray.data reader that can easily load data from databricks UC tables.
This reader is implemented based on Databricks statemenet execution API, the overall process is:

  1. read_databricks_tables create request /api/2.0/sql/statements to databricks shard, request payload contains authentication token and query information.
  2. get response that contains the query result chunks information. The query result is split into multiple chunks, each chunk data size is around 10 ~ 20 MB.
  3. Generate a couple of Ray Read tasks, read tasks number is determined by parallelism argument. Each read task will fetch one or more chunks data. Feching chunks data uses this API.
  4. Ray read tasks are dispatched to remote Ray workers and executed in distributed way.

API:

    """
    Read from a Databricks UC table or Databricks SQL execution result that queries
    from Databricks UC tables.
    If it is not called in databricks runtime, you need to set environment varaibles
    'DATABRICKS_HOST' and 'DATABRICKS_TOKEN' firstly.

    This reader is implemented based on
    [Databricks statemenet execution API](https://docs.databricks.com/api/workspace/statementexecution).

    Examples:
        import ray
        ds = ray.data.read_databricks_tables(
            warehouse_id='a885ad08b64951ad',
            catalog='catalog_1',
            schema='db_1',
            query='select id from table_1 limit 750000',
        )

    Args:
        warehouse_id: The id of the databricks warehouse, the query statement is
            executed on this warehouse.
        table: The name of UC table you want to read. If this argument is set,
            you can't set 'query' argument, and the reader generates query
            of 'select * from {table_name}' under the hood.
        query: The query you want to execute. If this argument is set,
            you can't set 'table_name' argument.
        catalog: (Optional) The default catalog name used by the query
        schema: (Optional) The default schema used by the query
        parallelism: The requested parallelism of the read. Defaults to -1,
            which automatically determines the optimal parallelism for your
            configuration. You should not need to manually set this value in most cases.
            For details on how the parallelism is automatically determined and guidance
            on how to tune it, see :ref:`Tuning read parallelism
            <read_parallelism>`.
        ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.

    Returns:
        A :class:`Dataset` containing the queried data.
    """

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Comment on lines 1764 to 1766
catalog: str,
schema: str,
table_name: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a single argument called table? If users have already set the current catalog / schema in their notebook environment via USE CATALOG / USE SCHEMA, can we infer the catalog and schema? If it's too tricky, not a big deal for now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can, but I prefer to make catalog and schema arguments to be optional, in databricks shard, catalog and schema already has default values, but with the optional arguments, user can easily override them.

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Comment on lines 69 to 77
while state in ["PENDING", "RUNNING"]:
time.sleep(1)
response = requests.get(
urljoin(url_base, statement_id) + "/",
auth=req_auth,
headers=req_headers,
)
response.raise_for_status()
state = response.json()["status"]["state"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no timeout?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we don't know how long query takes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We allow user to press Ctrl + C to cancel it, this should fulfil most cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long query takes is hard to estimate, it depends on sql workloads, query queues, warehouse resources, etc.

python/ray/data/read_api.py Outdated Show resolved Hide resolved
raw_response = requests.get(external_url, auth=None, headers=None)
raw_response.raise_for_status()

arrow_table = pyarrow.ipc.open_stream(raw_response.content).read_all()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is raw_response.content large?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually 10 ~ 20MB size per chunk

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@WeichenXu123
Copy link
Contributor Author

Gentle ping @c21

@bveeramani
Copy link
Member

Hey @WeichenXu123, thanks for contributing! I'll take a look at this PR sometime this week

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation overall LGTM.

Two concerns:

  • How do we handle retries, especially for rate limiting errors?
  • IIRC Databricks marks a statement as closed when the last statement has been read. Is this still an issue, and if so, do how do we workaround it?

python/ray/data/read_api.py Outdated Show resolved Hide resolved
python/ray/data/read_api.py Outdated Show resolved Hide resolved
python/ray/data/read_api.py Outdated Show resolved Hide resolved
python/ray/data/read_api.py Outdated Show resolved Hide resolved
python/ray/data/read_api.py Outdated Show resolved Hide resolved
else:
raise ValueError(
"You are not in databricks runtime, please set environment variable "
"'DATABRICKS_HOST' and 'DATABRICKS_TOKEN'."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any relevant documentation we could link to? Specifically, how can would a user find DATABRICKS_HOST and DATABRICKS_TOKEN?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is already in this API doc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in the docstring? I think we just link to https://docs.databricks.com/api/workspace/statementexecution, which I don't think covers how to find the environment variables?

Also, to make the error message more actionable, I think it'd be great if we could say something like:

"Please set environment variable ... To get these values, see [relevant Databricks page]"

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I got it, these 2 environment variable keys are defined in read_databricks_tables code, so we don't have databricks doc for them.

I refined read_databricks_tables API doc to add description for the 2 environment variables:

  • 'DATABRICKS_HOST' means databricks workspace URL, like adb-<workspace-id>.<random-number>.azuredatabricks.net,
  • 'DATABRICKS_TOKEN' means databricks workspace access token, user knows how to get it if it saw the doc description.

python/ray/data/datasource/databricks_uc_datasource.py Outdated Show resolved Hide resolved
python/ray/data/datasource/databricks_uc_datasource.py Outdated Show resolved Hide resolved
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@WeichenXu123
Copy link
Contributor Author

@bveeramani

Your comments are all adressed or answered.

How do we handle retries, especially for rate limiting errors?

Without considering rate limiting, I think we don't need to add retry code, because Ray task should automatically retry when failed if I understand ray correctly.

If we considered rate limit, I need to ask our warehouse engineer and then I will get back to you.

How do we handle retries, especially for rate limiting errors?
IIRC Databricks marks a statement as closed when the last statement has been read. Is this still an issue, and if so, do how do we workaround it?

I replied it in #39852 (comment)

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@WeichenXu123
Copy link
Contributor Author

@bveeramani

I got replied from our warehouse engineer about rate limit:

For concurrent request (concurrent fetching chunk data) limiters of EXTERNAL_LINKS type (the type used in my PR), it's around 750 requests per second per workspace.

So I think we rarely hit the rate limit , because one chunk data is at least 10MB size and each request takes some time to complete, if hit rate limit, we can just let ray task retry.

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Overall LGTM!

Once we've resolved the remaining review comments, this should be good to merge.

For concurrent request (concurrent fetching chunk data) limiters of EXTERNAL_LINKS type (the type used in my PR), it's around 750 requests per second per workspace.

Got it. Think this should be okay in most cases, but if you have a cluster with more than 750 cores I can see this being an issue. IIRC we ran into some rate limiting errors when we performed internal testing.

In any case, I think we can defer this for now and address this as a follow up if people run into issues.

if hit rate limit, we can just let ray task retry.

Ray Tasks don't retry application-level exceptions by default. So, if you get a rate limiting error, I don't think it'd get retried.

python/ray/data/read_api.py Show resolved Hide resolved
else:
raise ValueError(
"You are not in databricks runtime, please set environment variable "
"'DATABRICKS_HOST' and 'DATABRICKS_TOKEN'."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in the docstring? I think we just link to https://docs.databricks.com/api/workspace/statementexecution, which I don't think covers how to find the environment variables?

Also, to make the error message more actionable, I think it'd be great if we could say something like:

"Please set environment variable ... To get these values, see [relevant Databricks page]"

python/ray/data/read_api.py Show resolved Hide resolved
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@WeichenXu123
Copy link
Contributor Author

if you have a cluster with more than 750 cores I can see this being an issue. IIRC we ran into some rate limiting errors when we performed internal testing.

Ray Tasks don't retry application-level exceptions by default. So, if you get a rate limiting error, I don't think it'd get retried.

We can address them in follow-up PRs, since this case rarely happens

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@bveeramani
Copy link
Member

We can address them in follow-up PRs, since this case rarely happens

Sounds good. Thanks for your work on this PR!

@bveeramani bveeramani merged commit 9b3d63d into ray-project:master Oct 6, 2023
41 of 44 checks passed
@bveeramani
Copy link
Member

@WeichenXu123 When you're back from vacation, could add tests? My bad -- I should've caught this in an earlier review and before merging

@WeichenXu123
Copy link
Contributor Author

@WeichenXu123 When you're back from vacation, could add tests? My bad -- I should've caught this in an earlier review and before merging

The e2e test requires databricks environment, we will add e2e test in our databricks code repo to monitor the Ray reader works.

For unit test in Ray repo, we can only add mocking test, we can add it if you need it.

Zandew pushed a commit to Zandew/ray that referenced this pull request Oct 10, 2023
Databricks users request a ray.data reader that can easily load data from databricks UC tables.
This reader is implemented based on Databricks statemenet execution API, the overall process is:

1. read_databricks_tables create request /api/2.0/sql/statements to databricks shard, request payload contains authentication token and query information.
2. get response that contains the query result chunks information. The query result is split into multiple chunks, each chunk data size is around 10 ~ 20 MB.
3. Generate a couple of Ray Read tasks, read tasks number is determined by parallelism argument. Each read task will fetch one or more chunks data. Feching chunks data uses this API.
4. Ray read tasks are dispatched to remote Ray workers and executed in distributed way.

---------

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
Databricks users request a ray.data reader that can easily load data from databricks UC tables.
This reader is implemented based on Databricks statemenet execution API, the overall process is:

1. read_databricks_tables create request /api/2.0/sql/statements to databricks shard, request payload contains authentication token and query information.
2. get response that contains the query result chunks information. The query result is split into multiple chunks, each chunk data size is around 10 ~ 20 MB.
3. Generate a couple of Ray Read tasks, read tasks number is determined by parallelism argument. Each read task will fetch one or more chunks data. Feching chunks data uses this API.
4. Ray read tasks are dispatched to remote Ray workers and executed in distributed way.

---------

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Victor <vctr.y.m@example.com>
@bveeramani
Copy link
Member

@WeichenXu123 ah, gotcha! Didn't realize you were running E2E tests on your side.

Given that we'd likely need extensive mocks to unit tests this feature, I think the additional complexity may outweigh the benefit of the tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants