Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Left Join Fails in Recent Version #14863

Open
2 tasks done
ealders opened this issue Mar 5, 2024 · 7 comments
Open
2 tasks done

Streaming Left Join Fails in Recent Version #14863

ealders opened this issue Mar 5, 2024 · 7 comments
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer rust Related to Rust Polars

Comments

@ealders
Copy link

ealders commented Mar 5, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

LazyFrame::scan_parquet(&args.latlongs_file, Default::default())?
    .with_streaming(true)
    .with_comm_subplan_elim(false)
    .inner_join(LazyFrame::scan_parquet(&args.zips_file, Default::default())?, col("sample_point_id"), col("sample_point_id"))
    .inner_join(LazyFrame::scan_parquet(&args.providers_file, Default::default())?, col("latlong_id"), col("latlong_id"))
    .join(LazyFrame::scan_parquet(&args.overrides_file, Default::default())?, [col("fips_code"), col("specialty_search_group_id")], [col("fips_code"), col("specialty_search_group_id")], JoinArgs::new(JoinType::Left))
    .filter(
      col("distance").gt(coalesce(&[col("override_distance"), lit(args.mileage)]))
    )
    .filter(col("specialty_search_group_id").eq(lit(args.specialty_search_group_id)))
    .filter(col("region_type_id").eq(lit(args.region_type_id)))
    .filter(network_filter(args.network_id))
    .group_by([col("sample_point_id"), col("provider_id")])
    .agg([col("distance").min()])
    .select([col("sample_point_id"), col("distance")])
    .sink_ipc(min_distances_temp_file.clone(), Default::default())?;

Log output

POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
RUN STREAMING PIPELINE
parquet -> generic_join_build
RefCell { value: [parquet -> generic_join_build, parquet -> placeholder -> fast_projection -> generic_join_build, parquet -> placeholder -> fast_projection -> placeholder -> filter -> fast_projection -> generic-group_by -> fast_projection -> parquet_sink] }
STREAMING CHUNK SIZE: 16666 rows
thread 'main' panicked at /home/rails/.cargo/registry/src/index.crates.io-6f17d22bba15001f/polars-core-0.35.4/src/utils/mod.rs:560:34:
called `Option::unwrap()` on a `None` value
stack backtrace:
   0: rust_begin_unwind
             at /rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library/std/src/panicking.rs:597:5
   1: core::panicking::panic_fmt
             at /rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library/core/src/panicking.rs:72:14
   2: core::panicking::panic
             at /rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library/core/src/panicking.rs:127:5
   3: polars_core::utils::accumulate_dataframes_vertical_unchecked
   4: <polars_pipe::executors::sinks::joins::generic_build::GenericBuild as polars_pipe::operators::sink::Sink>::finalize
   5: polars_pipe::pipeline::dispatcher::PipeLine::run_pipeline
   6: polars_pipe::pipeline::dispatcher::PipeLine::execute
   7: <F as polars_plan::logical_plan::apply::DataFrameUdfMut>::call_udf
   8: polars_plan::logical_plan::functions::FunctionNode::evaluate
   9: polars_lazy::physical_plan::state::ExecutionState::record
  10: <polars_lazy::physical_plan::executors::udf::UdfExec as polars_lazy::physical_plan::executors::executor::Executor>::execute
  11: polars_lazy::frame::LazyFrame::sink_ipc

Issue description

The above code works flawlessly in Polars v0.34.2. When working to upgrade our Polars version to the recent v0.38.1 version I noticed this issue. So I went back to upgrade a version at a time to see when this issue got introduced. It looks like this got introduced in v0.35.4 when changes were made to joining. Particularly, this left join is what is causing the issue that uses multiple columns...

.join(LazyFrame::scan_parquet(&args.overrides_file, Default::default())?, [col("fips_code"), col("specialty_search_group_id")], [col("fips_code"), col("specialty_search_group_id")], JoinArgs::new(JoinType::Left))

if I comment that line out it will sink appropriately. One thing to note is the parquet that is being left joined is empty most of the time or has only a few rows in it.

Expected behavior

Would expect this work correctly as in v0.34.2.

Installed versions

features = ["lazy", "streaming", "csv", "polars-io", "parquet", "performant", "strings", "concat_str", "ipc"]

@ealders ealders added bug Something isn't working needs triage Awaiting prioritization by a maintainer rust Related to Rust Polars labels Mar 5, 2024
@ritchie46
Copy link
Member

Thank you for the issue report. Can you make a rminimal eproducable example?

Remove all code that doesn't influence and ideally reproduce from an in-memory table. If not, provide the code that generates the file. Currently a file is missing and I cannot reproduce.

@ealders
Copy link
Author

ealders commented Mar 6, 2024

I'll work on putting an example together that will compile for you.

@ealders
Copy link
Author

ealders commented Mar 6, 2024

parquet_files.tgz
I've attached an archive of the parquet files here as well if you want to try with those.

@cmdlineluser
Copy link
Contributor

Seems to be happening because overrides_file.parquet is empty.

Minimal repro:

import polars as pl

pl.LazyFrame(schema=["a", "b", "c"]).cast(pl.String).sink_parquet("empty.parquet")

pl.LazyFrame({"a": ["uno"]}).join(
   pl.scan_parquet("empty.parquet"),
   on="a"
).collect()
# shape: (0, 3)
# ┌─────┬─────┬─────┐
# │ a   ┆ b   ┆ c   │
# │ --- ┆ --- ┆ --- │
# │ str ┆ str ┆ str │
# ╞═════╪═════╪═════╡
# └─────┴─────┴─────┘

pl.LazyFrame({"a": ["uno"]}).join(
   pl.scan_parquet("empty.parquet"),
   on="a"
).collect(streaming=True)
# PanicException: called `Option::unwrap()` on a `None` value

@ealders
Copy link
Author

ealders commented Mar 7, 2024

Thanks @cmdlineluser!

@tzehaoo
Copy link

tzehaoo commented Mar 8, 2024

I would like to take on this please

@ealders
Copy link
Author

ealders commented Aug 4, 2024

@ritchie46 @tzehaoo - Have either one of you had a chance to look deeper into this? Is it something to do with the updated parquet file handling library that was introduced?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer rust Related to Rust Polars
Projects
None yet
Development

No branches or pull requests

4 participants