Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for custom datasources as plugins #74

Closed
universalmind303 opened this issue Mar 27, 2024 · 10 comments
Closed

Allow for custom datasources as plugins #74

universalmind303 opened this issue Mar 27, 2024 · 10 comments

Comments

@universalmind303
Copy link

The current system makes it pretty easy to add new transformations (expr)'s as plugins, but there is currently no good way for users to provide custom datasources.


Ideally, custom datasources should be as easy as implementing a trait or macro. There is already the AnonymousScan trait that mostly works for this use case, but doesn't work via pyo3-polars due to (de)serialization issues (see #67). Maybe we can have an FFI equivalent instead of the in memory AnonymousScan?

If we loosely base it off of datafusion's TableProvider it may look something like this

pub struct DummyDatasource {}

impl PolarsDatasource for DummyDatasource {
  fn schema(&self) -> SchemaRef {
    Arc::new(Schema::empty())
  }
  fn scan(&self, projection: &Option<Vec<usize>>, filters: &[Expr], limit: Option<usize>) -> Result<Box<dyn Executor>> {
    Ok(Box::new(DummyExec::new()))}
  }
}

pub struct DummyExec {}

impl DummyExec {
    pub fn new() -> Self {
        DummyExec {}
    }
}

impl Executor for DummyExec {
    fn execute(&mut self, cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
        Ok(DataFrame::empty())
    };
}

Related issues

#67

@NielsPraet
Copy link

For my thesis I am currently looking at how I can hook an existing backend query service into Polars to use the Lazy DataFrame API. This however would need to be passed from the Rust side to the Python side as the use-case is aimed at Data Scientists / ML Engineers working in Python. From what I gathered it unfortunately seems to be impossible to do so right now, so I want to +1 this issue as this would in general open up a lot of possibilities for the Polars eco system!

@shahamran
Copy link

Can anyone suggest how to work around this limitation? That is, how can I "extend polars" to support scanning my custom file formats?

I looked at https://github.com/universalmind303/polars-mongo which seems clean and straight-forward, but suffers from the same limitation as in #67.

@hantusk
Copy link

hantusk commented Jul 4, 2024

You might be able to scan your custom file formats using fsspec. Here's an example: https://csvbase.com/blog/7.

@shahamran
Copy link

@hantusk this looks very cool and potentially a good workaround. Thanks for sharing!

@ritchie46
Copy link
Member

ritchie46 commented Jul 15, 2024

This is something I want to get into to. But it need to be more than a trait as we want to get over FFI. On the rust side there is already AnymousSource. This will be extended to support the new streaming engine.

@shahamran
Copy link

Hi @ritchie46, I've been using the newly released IO plugins and it works well, thank you.

I have a question regarding n_rows. In the docstring it says:

n_rows: Materialize only n rows from the source. The reader can stop when n_rows are read.

Is it before or after the predicate is applied? In this context, what's the meaning of "materialize"?

Thanks again for implementing this!

@ritchie46
Copy link
Member

Wow, you are quick. I am still working on the example. :D

@ritchie46
Copy link
Member

Here is the working example; https://github.com/pola-rs/pyo3-polars/tree/main/example/io_plugin

@shahamran
Copy link

@ritchie46 thank you.

I understand from this that n_rows can be used regardless of predicate. I have another question. Can I modify n_rows to account for batch sizes? e.g.:

def _read_my_format_impl(path: str, ...) -> pl.DataFrame: ...

def scan_my_format(paths, ...) -> pl.LazyFrame:
    def _read_my_format(with_columns, predicate, n_rows, batch_size):
        for path in paths:
            df = _read_my_format_impl(path, columns=with_columns, n_rows=n_rows)
            if predicate is not None:
                df = df.filter(predicate)
            yield df
            if n_rows is not None:
                n_rows -= df.height  # <-- is this legit?
                if n_rows <= 0:
                    break

    return register_io_source(callable=_read_my_format, schema=...)

@ritchie46
Copy link
Member

Maybe. You are not allowed to return more than n_rows. It is the upper limit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants