Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't sink_parquet on a sorted LazyFrame containing decimal columns #17289

Open
2 tasks done
edthrn opened this issue Jun 29, 2024 · 4 comments
Open
2 tasks done

Can't sink_parquet on a sorted LazyFrame containing decimal columns #17289

edthrn opened this issue Jun 29, 2024 · 4 comments
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@edthrn
Copy link

edthrn commented Jun 29, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

Given a very large data set (1b rows) stored on S3:

This works good:

# Works as expected
pl.scan_parquet(
    "s3://.../my_dataset/**/*.parquet"
).filter(
    pl.col("date") < datetime.now() - timedelta(days=120),
).sink_parquet(
    "/tmp/data.parquet"
)

But this doesn't:

# Does not work
pl.scan_parquet(
    "s3://.../my_dataset/**/*.parquet"
).filter(
    pl.col("date") < datetime.now() - timedelta(days=120),
).sort(
    pl.col("value")
).sink_parquet(
    "/tmp/data.parquet"
)

I get the following error:

Log output

POLARS PREFETCH_SIZE: 64
RUN STREAMING PIPELINE
[parquet -> sort -> parquet_sink]
STREAMING CHUNK SIZE: 1388 rows
STREAMING CHUNK SIZE: 1388 rows
...
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
OOC sort started
Temporary directory path in use: /tmp
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
parquet file must be read, statistics not sufficient for predicate.
OOC sort started
OOC sort started
...
OOC sort started
thread '<unnamed>' panicked at crates/polars-core/src/series/series_trait.rs:234:9:
`shrink_to_fit` operation not supported for dtype `decimal[22,10]`
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'thread 'thread 'polars-9polars-27polars-1' panicked at ' panicked at ' panicked at crates/polars-pipe/src/executors/sinks/io.rscrates/polars-pipe/src/executors/sinks/io.rscrates/polars-pipe/src/executors/sinks/io.rs:::271271271:::4949:
49called `Result::unwrap()` on an `Err` value: "SendError(..)":
:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-16' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-5' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
[...]
thread 'polars-3' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-7' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-30' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
[...]
---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
Cell In[7], line 9
      1 pl.scan_parquet(
      2     's3://.../*.parquet',
      3     storage_options=options
      4 ).filter(
      5     pl.col("date") < datetime.now() - timedelta(days=120)
      7 ).sort(
      8     pl.col("value")
----> 9 ).sink_parquet(
     10     '/tmp/data.parquet',
     11 )

File ~/venv/lib/python3.11/site-packages/polars/_utils/unstable.py:58, in unstable.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
     55 @wraps(function)
     56 def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
     57     issue_unstable_warning(f"`{function.__name__}` is considered unstable.")
---> 58     return function(*args, **kwargs)

File ~/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:2233, in LazyFrame.sink_parquet(self, path, compression, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, no_optimization)
   2225 elif statistics == "full":
   2226     statistics = {
   2227         "min": True,
   2228         "max": True,
   2229         "distinct_count": True,
   2230         "null_count": True,
   2231     }
-> 2233 return lf.sink_parquet(
   2234     path=normalize_filepath(path),
   2235     compression=compression,
   2236     compression_level=compression_level,
   2237     statistics=statistics,
   2238     row_group_size=row_group_size,
   2239     data_pagesize_limit=data_pagesize_limit,
   2240     maintain_order=maintain_order,
   2241 )

PanicException: called `Result::unwrap()` on an `Err` value: "SendError(..)"

Issue description

I stumbled upon #16603 and tried the POLARS_ACTIVATE_DECIMAL=1 hack.

It was necessary for the first (unsorted) sample code to work, but it is apparently not sufficient for the sorted code sample to work.

I tested with both versions 0.20.31 and 1.0.0rc2: Same results.

EDIT: also tested on 1.0.0 with same results

Expected behavior

I expected the lazy scan/filter/sort/sink to work as good as scan/filter/sink.

Installed versions

--------Version info---------
Polars:               0.20.31
Index type:           UInt32
Platform:             Linux-4.19.0-27-cloud-amd64-x86_64-with-glibc2.28
Python:               3.11.0 (main, May 15 2024, 19:44:29) [GCC 8.3.0]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               2024.6.1
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           <not installed>
nest_asyncio:         1.6.0
numpy:                2.0.0
openpyxl:             <not installed>
pandas:               2.2.2
pyarrow:              16.1.0
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           2.0.31
torch:                <not installed>
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>
@edthrn edthrn added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Jun 29, 2024
@edthrn edthrn changed the title Can't sink_parquet on a sorted LazyFrame Can't sink_parquet on a sorted LazyFrame containing decimal columns Jun 29, 2024
@edthrn
Copy link
Author

edthrn commented Jun 29, 2024

I just tested with a smaller dataset, ie instead of scanning all ~17k files, I only scan the first 50...

And it works 🤔

Does it mean that the problem comes from the data itself (eg, a null value or something similar)? In that case, it's still odd that the unsorted version works as expected...

@edthrn
Copy link
Author

edthrn commented Jun 29, 2024

I managed to scan/filter/sort/sink the whole dataset by processing it by batches of 500 source files.

for i, batch in enumerate(batched(s3_urls, batch_size=500)):
    pl.scan_parquet(
        batch,
    ).filter(
        pl.col("date") < datetime.now() - timedelta(days=120)
    ).sort(
        pl.col("value")
    ).sink_parquet(
        f'/tmp/data_{i}.parquet',
    )

Hence the supposition I gave above can be ruled out: it's not a data value/data type problem.


Minor problem now: I now have 34 parquet files at the end of the process (knowing that I have 17k source files in total), instead of a single large one.

@lostmygithubaccount
Copy link

lostmygithubaccount commented Jul 1, 2024

I'm hitting something similar after upgrading to Polars v1.0.0 (note: I am using polars-u64-idx)

thread '<unnamed>' panicked at crates/polars-core/src/series/series_trait.rs:234:9:
`shrink_to_fit` operation not supported for dtype `decimal[15,2]`
thread 'polars-5' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-4' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-3' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"thread 'polars-6
' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-8' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-9' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-10' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
Cell In[7], [line 5](vscode-notebook-cell:?execution_count=7&line=5)
      [1](vscode-notebook-cell:?execution_count=7&line=1) df = pl.scan_parquet(data)
      [2](vscode-notebook-cell:?execution_count=7&line=2) (
      [3](vscode-notebook-cell:?execution_count=7&line=3)     df.sort(pl.col("l_orderkey"), pl.col("l_partkey"), pl.col("l_suppkey"))
      [4](vscode-notebook-cell:?execution_count=7&line=4)     .head(3)
----> [5](vscode-notebook-cell:?execution_count=7&line=5)     .collect(streaming=True)
      [6](vscode-notebook-cell:?execution_count=7&line=6) )

File ~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1942, in LazyFrame.collect(self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, cluster_with_columns, no_optimization, streaming, background, _eager, **_kwargs)
   [1939](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1939) # Only for testing purposes atm.
   [1940](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1940) callback = _kwargs.get("post_opt_callback")
-> [1942](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1942) return wrap_df(ldf.collect(callback))

PanicException: called `Result::unwrap()` on an `Err` value: "SendError(..)"

from:

df = pl.scan_parquet(data)
(
    df.sort(pl.col("l_orderkey"), pl.col("l_partkey"), pl.col("l_suppkey"))
    .head(3)
    .collect(streaming=True)
)

where data points to ~275GB of Parquet files

interestingly before upgrading I was hitting #17281 on this operation

@edthrn
Copy link
Author

edthrn commented Jul 3, 2024

I confirm that I still get the issue after upgrading to v1.0.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

2 participants