-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Description
Is your feature request related to a problem or challenge?
This is an idea that @robtandy brought up on the DataFusion sync call the other day and I think it would be pretty useful.
The usecase is "I want to read more than 1 but not an entire directory of parquet files from remote object store" -- I think in this case to look at some particular files
For example, let's say you want to read just these two files:
- s3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_1.parquet
- s3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_2.parquet
There is currently no way to do so via SQL. You can either do the entire directory
> CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/' options (aws.region 'eu-central-1');
0 row(s) fetched.
Elapsed 2.928 seconds.
Or you can read each file separately
> CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_1.parquet' options (aws.region 'eu-central-1');
0 row(s) fetched.
Elapsed 1.017 seconds.
Describe the solution you'd like
I would like to be able to read an arbitrary set of remote parquet files
It would also be awesome to support GLOB files (e.g. *
) which has been requested before
Describe alternatives you've considered
I suggest we implement a TableFunction
similar to the DuckDB read_parquet
file ONLY in the datafusion-cli
source
So to query the files listed above, this would look like
SELECT * FROM read_parquet([
'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet',
'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_2.parquet'
]);
From the duckdb docs: https://duckdb.org/docs/stable/data/parquet/overview.html
-- read file1, file2, file3
SELECT *
FROM read_parquet(['file1.parquet', 'file2.parquet', 'file3.parquet']);
-- Support GLOB access
SELECT *
FROM read_parquet(['folder1/*.parquet', 'folder2/*.parquet']);
We already support the parquet_metadata
function in datafusion-cli (docs)
SELECT path_in_schema, row_group_id, row_group_num_rows, stats_min, stats_max, total_compressed_size
FROM parquet_metadata('hits.parquet')
WHERE path_in_schema = '"WatchID"'
LIMIT 3;
+----------------+--------------+--------------------+---------------------+---------------------+-----------------------+
| path_in_schema | row_group_id | row_group_num_rows | stats_min | stats_max | total_compressed_size |
+----------------+--------------+--------------------+---------------------+---------------------+-----------------------+
| "WatchID" | 0 | 450560 | 4611687214012840539 | 9223369186199968220 | 3883759 |
| "WatchID" | 1 | 612174 | 4611689135232456464 | 9223371478009085789 | 5176803 |
| "WatchID" | 2 | 344064 | 4611692774829951781 | 9223363791697310021 | 3031680 |
+----------------+--------------+--------------------+---------------------+---------------------+-----------------------+
3 rows in set. Query took 0.053 seconds.
Here is the code implementation:
datafusion/datafusion-cli/src/functions.rs
Line 322 in 85f6621
impl TableFunctionImpl for ParquetMetadataFunc { |
We can also look at the ClickBench
S3 command that is similar: https://clickhouse.com/docs/integrations/s3
DESCRIBE TABLE s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames');
Open questions
What to do if the files are on different object stores (e.g. S3 and http):
SELECT * FROM read_parquet([
'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet',
-- note a different object store
's3://public-datasets/hits_compatible/athena_partitioned/hits_2.parquet'
]);
At first I suggest we don't try and support this
Additional context
No response
Activity
comphead commentedon Jun 6, 2025
That actually was on my backlog couple of months. It is nice to support an array of files or globs
comphead commentedon Jun 6, 2025
I'll try to take in 2 weeks if no one else beats me to it
alamb commentedon Jun 6, 2025
Maybe @robtandy could help
a-agmon commentedon Jun 7, 2025
@alamb - I'm less familiar with this area in datafusion but might be able to give this a shot.
The idea is to add this as a table function right?
I can see that
ListingTableUrl::parse
supports glob strings, so does it make sense to simply implement this as a listing table?a-agmon commentedon Jun 8, 2025
I gave it a shot but it ended up being somewhat messy. Thats mostly due to the fact that on the one hand
TableFunctionImpl::call()
is synchronous, yet, on the other hand, it also has to get a hold of the schema of the data, which in the case of remote blobs (like s3), requires IO and async to be done right.I was trying to work around this by using the
call()
method to create aTableProvider
that initially reports an empty schema. This satisfies the planner's synchronous API. The actual schema discovery is deferred until the scan() method is called during the asynchronous execution phase. But this creates an issue with projections that require to validate schema, i.e,select X from read_csv(some-glob-pattern)
thoughselect * from read_csv(some-glob-pattern)
will workalamb commentedon Jun 8, 2025
Thnks @a-agmon -- maybe this example would help: https://docs.rs/datafusion/latest/datafusion/catalog/trait.AsyncSchemaProvider.html
I agree the trick will be figuring out how to async calls.
alamb commentedon Jun 8, 2025
Yes this is what I would expect -- that the result of calling
read_parquet
is / uses the LIstingTable implementationa-agmon commentedon Jun 8, 2025
I have added a draft for this PR. Would be happy for your comments.
a-agmon commentedon Jun 9, 2025
Hi @alamb ,
@comphead raises a couple of good questions about the PR, so I'm linking it here to hear you thoughts.
#16332 (comment)
a-agmon commentedon Jun 12, 2025
Hi @comphead and @alamb
I thought it might be a good idea to split this issue to several PRs
1 - add the support to use
CREATE TABLE
syntax with glob patterns and remote URL schemes just as with local ones (The new PR above tried to handle this).2 - add table functions (
read_parquet
,read_csv
, etc) to support glob reading (Im working on your comments regarding this one).Hope this makes sense, feel free to comment also if not...
alamb commentedon Jun 13, 2025
I agree a few smaller focused PRs will make sense
Thank you for working on this
robtandy commentedon Jun 13, 2025
Thanks for creating this issue @alamb !!
Regarding the location of the code, if it is in datafusion proper rather than the CLI, it would be available in datafusion python, and any other projects that want to offer functionality backed by datafusion. I think it increases the utility of datafusion as a library and will get used.
Is it possible that it is a configuration option about whether to enable it? Like how
datafusion.catalog.information_schema
enables the info schema in theSessionState
? I do understand that it will be more code to maintain, but my intuition is that this is generally useful enough to offer within the core as i think it will provide value. Its possible though i don't fully appreciate the ramifications of this choice though.Curious what people think about this.
alamb commentedon Jun 16, 2025
That is an interesting idea -- I agree that having
CREATE EXTERNAL TABLE
support this kind of URL / multiple files would be usefulI am not sure I fully understand the ramifications either -- if we simply update the SQL planner (SqlToRel) to split the URL list on
' '
or', '
that certainly seems straightforward to me (and would be backwards compatible...)