Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pl.concat(how="horizontal") support for LazyFrame #10203

Closed
moritzwilksch opened this issue Jul 31, 2023 · 9 comments · Fixed by #13139
Closed

pl.concat(how="horizontal") support for LazyFrame #10203

moritzwilksch opened this issue Jul 31, 2023 · 9 comments · Fixed by #13139
Labels
accepted Ready for implementation enhancement New feature or an improvement of an existing feature

Comments

@moritzwilksch
Copy link
Contributor

Problem description

It would be great if pl.concat() supported LazyFrames with the "horizontal" strategy.
This might be way more complicated than it appears, maybe is that the reason why it's not supported at the moment?

@moritzwilksch moritzwilksch added the enhancement New feature or an improvement of an existing feature label Jul 31, 2023
@stinodego
Copy link
Member

stinodego commented Jul 31, 2023

Probably has something to do with the number of rows being unknown until collecting. But honestly I think we can implement this, seeing as this works fine:

import polars as pl

lf1 = pl.LazyFrame({"a": [1, 2], "b": [3, 4]})
lf2 = pl.LazyFrame({"c": [5, 6], "d": [7, 8]})

lf_new = lf1.with_context(lf2).select(pl.all())

print(lf_new.collect())
shape: (2, 4)
┌─────┬─────┬─────┬─────┐
│ a   ┆ b   ┆ c   ┆ d   │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 │
╞═════╪═════╪═════╪═════╡
│ 1   ┆ 3   ┆ 5   ┆ 7   │
│ 2   ┆ 4   ┆ 6   ┆ 8   │
└─────┴─────┴─────┴─────┘

Anything I am missing here, @ritchie46 ?

@ritchie46
Copy link
Member

Yes, we could add that. We need to extend the query engine and plan for that. This one is already longer on my stack, but no high prio.

@stinodego
Copy link
Member

All right. In that case I'll accept this issue. Until it is implemented, you can use the workaround above.

@stinodego stinodego added the accepted Ready for implementation label Jul 31, 2023
@erinov1
Copy link

erinov1 commented Aug 2, 2023

I'd like to draw attention to #9006 in light of the suggestion to use with_context. I've been unable to use with_context with the output of the scan_* functions since at least 0.17.15.

@fsimkovic
Copy link
Contributor

fsimkovic commented Nov 27, 2023

One possible alternative workaround is to use pl.concat(..., how="align") but it is significantly slower than the non-lazy equivalent (probably because of the outer joins).

pl.concat([f.with_row_count("idx") for f in dfs], how="align").drop("idx")

A quick benchmark of the different approaches:

import numpy as np
import polars as pl

dfs = [
    pl.from_numpy(
        np.random.random((10, 200)),
        schema=[f"f_{i}_{j}" for j in range(10)],
    ) for i in range(100)
]
lazy_dfs = [df.lazy() for df in dfs]
In [1]: %timeit pl.concat(dfs, how="horizontal")
1.77 ms ± 36.7 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

In [2]: %timeit pl.concat([f.with_row_count("idx") for f in lazy_dfs], how="align").drop("idx").collect()
140 ms ± 19.8 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

The workaround proposed by @stinodego may not work when filters are applied subsequently (#5724). It also doesn't seem to work when multiple lazy frames need to be horizontally concatenated (#12598).

Traceback
In [1]: lazy_dfs[0].with_context(lazy_dfs[1]).select(pl.all()).with_context(lazy_dfs[2]).select(pl.all()).collect()
---------------------------------------------------------------------------
ColumnNotFoundError                       Traceback (most recent call last)
Cell In[9], line 1
----> 1 lazy_dfs[0].with_context(lazy_dfs[1]).select(pl.all()).with_context(lazy_dfs[2]).select(pl.all()).collect()

File ~/.virtualenvs/default/lib/python3.11/site-packages/polars/lazyframe/frame.py:1788, in LazyFrame.collect(self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, no_optimization, streaming, _eager)
   1775     comm_subplan_elim = False
   1777 ldf = self._ldf.optimization_toggle(
   1778     type_coercion,
   1779     predicate_pushdown,
   (...)
   1786     _eager,
   1787 )
-> 1788 return wrap_df(ldf.collect())

ColumnNotFoundError: f_2_2

@ritchie46 is there a better way to perform this operation for multiple lazy dataframes? The performance aspect is a big blocker when loading sharded data.


Versions used
In [1]: pl.show_versions()
--------Version info---------
Polars:              0.19.16
Index type:          UInt32
Platform:            macOS-11.7.10-x86_64-i386-64bit
Python:              3.11.5 (main, Aug 24 2023, 15:23:14) [Clang 13.0.0 (clang-1300.0.29.30)]

----Optional dependencies----
adbc_driver_sqlite:  <not installed>
cloudpickle:         <not installed>
connectorx:          <not installed>
deltalake:           <not installed>
fsspec:              <not installed>
gevent:              <not installed>
matplotlib:          <not installed>
numpy:               1.26.1
openpyxl:            <not installed>
pandas:              2.1.3
pyarrow:             14.0.1
pydantic:            <not installed>
pyiceberg:           <not installed>
pyxlsb:              <not installed>
sqlalchemy:          <not installed>
xlsx2csv:            <not installed>
xlsxwriter:          <not installed>

@adamreeve
Copy link
Contributor

Hi @ritchie46, we (@G-Research) are quite keen to have this feature implemented, and I'm happy to start working on this and make a PR unless you'd prefer to implement it yourself?

It looks like this will require a new LogicalPlan type with an associated Executor implementation, then some extra work to also support streaming and optimizations with the new plan, which could possibly be follow up PRs. Does that sound sensible to you?

@adamreeve
Copy link
Contributor

I've started looking into this and have opened a draft PR to get some feedback on the approach at #13139

@ritchie46
Copy link
Member

It looks like this will require a new LogicalPlan type with an associated Executor implementation, then some extra work to also support streaming and optimizations with the new plan, which could possibly be follow up PRs. Does that sound sensible to you?

Sounds sensible. It can be blocked for streaming at the moment (e.g. don't allow it to go into streaming engine). It is also important that we block predicate pushdown.

@fsimkovic
Copy link
Contributor

Thanks @adamreeve . I can confirm that the same benchmark I ran above is now as follows

import numpy as np
import polars as pl

dfs = [
    pl.from_numpy(
        np.random.random((10, 200)),
        schema=[f"f_{i}_{j}" for j in range(10)],
    ) for i in range(100)
]
lazy_dfs = [df.lazy() for df in dfs]
In [1]: %timeit pl.concat(dfs, how="horizontal")
1.77 ms ± 36.7 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

In [2]: %timeit pl.concat(lazy_dfs, how="horizontal").collect()
2.08 ms ± 24.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted Ready for implementation enhancement New feature or an improvement of an existing feature
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

6 participants