Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filtered arrow tables and conversion to arquero table: filter is lost #51

Closed
ericemc3 opened this issue Dec 11, 2020 · 12 comments
Closed
Labels
enhancement New feature or request

Comments

@ericemc3
Copy link

Arquero does not seem to take into account a filter applied to an arrow table.

Here is my use case:

I have quite a big arrow table:

arrow_tb.count()

=> 6159296

I filter it with arrow functions, which is very fast, especially with dictionary-encoded columns:

arrow_filtered_tb = arrow_tb.filter(arrow.predicate.col('NIVGEO').eq('COM') 
                            .and(arrow.predicate.col('CS1_8').eq('2'))
                            .and(arrow.predicate.col('AGEQ65').eq('60'))
                            .and(arrow.predicate.col('SEXE').eq('2') )
               )

arrow_filtered_tb.count()

=> 34951

I load it as an Arquero table and compute its number of rows:
aq.fromArrow(arrow_filtered_tb).numRows()

=> 6159296, which is not what i expect

Apparently, the arrow method column.toArray() exports the whole unfiltered column.

The arrow scan method looks more effective, i'd like to avoid here the conversion to objects before converting to arquero table :

nbRowsFilteredAfterScan = {
  let codgeo, nb, result = [] ;

  arrow_filtered_tb.scan((idx) => {
        result.push({'codgeo': codgeo(idx), 'nb': nb(idx)});
    }, (batch) => {
        codgeo = arrow.predicate.col('CODGEO').bind(batch);
        nb     = arrow.predicate.col('NB').bind(batch);
    });
  
  return aq.from(result).numRows() ;
}

=> 34951

@jheer
Copy link
Member

jheer commented Dec 11, 2020

Thanks. You're right that Arquero does not currently account for filtered tables as it access the columns directly. Do you happen to know if by chance Arrow has a method akin to Arquero's reify()? If so, that might be an option to explore. Otherwise, we might either (1) perform a scan or (2) see if there is a reasonable way to create a filtered Arquero table that mirrors the filtered Arrow table.

@jheer jheer added the enhancement New feature or request label Dec 11, 2020
@jheer
Copy link
Member

jheer commented Dec 11, 2020

There is also a related issue already filed for Arrow, but it looks like it’s been there for a while with no response: https://issues.apache.org/jira/browse/ARROW-9496

@ericemc3
Copy link
Author

I tried to find such a reify() feature, at the table or column level, but to no avail.
There is indeed a pb with this toArray function.

Generally speaking, the main interest of arrow is to avoid, or delay as much as possible the conversion to javascript objects. It is very interesting to use arrow filtering functions when you want to extract a small dataset from a large table. So scan() seems to me for the moment an interesting axis.

In fact the ideal would be to be able to use all (or most important) arquero verbs in an equivalent way, whether they apply to an arquero table or an arrow table.This would imply some kind of internal compilation to the arrow syntax, and I imagine it would be quite a huge enterprise!.

That's a bit I think what dplyr is trying to do (sorry to bring it back again with R ;))
https://arrow.apache.org/docs/r/articles/dataset.html

ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  group_by(passenger_count) %>%
  collect() %>%
  summarize(
    tip_pct = median(100 * tip_amount / total_amount),
    n = n()

The collect() is equivalent to your reify(), but before collect, this is standard dplyr syntax that works as usual, even if it applies here to an arrow table.

@jheer
Copy link
Member

jheer commented Dec 11, 2020

Here is the current Arrow JS implementation of filtered scans: https://github.com/apache/arrow/blob/master/js/src/compute/dataframe.ts#L119

The initial filter call should be very fast indeed as it does no work: the predicate is only tested later upon scanning. I’m curious how a big a performance difference one sees from a filtered scan that extracts values into “collected”/“reified” arrays versus just performing the filtering in Arquero. We will probably need to experiment a bit.

@ericemc3
Copy link
Author

I'll try and arrange some benchs in an observable notebook.
FWIW, here is a quick comparison from my 6 millions rows table:

nbRowsFilteredAfterScan = timeIt( ()=> {
  let codgeo, cs, sexe, age, nivgeo, nb, result = [], col = arrow.predicate.col ;
  
  let arrow_filtered_tb2 = arrow_tb.filter(col('NIVGEO').eq('COM') 
                            .and(col('CS1_8').eq('2'))
                            .and(col('AGEQ65').eq('60'))
                            .and(col('SEXE').eq('2') )
               )
  
  arrow_filtered_tb2.scan((idx) => {
        result.push({'codgeo': codgeo(idx),'sexe': sexe(idx),'age': age(idx),'nivgeo': nivgeo(idx), 'nb': nb(idx)});
    }, (batch) => {
        codgeo = col('CODGEO').bind(batch);
        nivgeo = col('NIVGEO').bind(batch);
        nb     = col('NB').bind(batch);
        cs     = col('CS1_8').bind(batch);
        sexe   = col('SEXE').bind(batch);
        age    = col('AGEQ65').bind(batch);
    });
  
  return aq.from(result).numRows() ;
})

nbRowsFilteredAfterScan = Object {time: 421, table: 34951}

nbRowsFilteredAq = timeIt( ()=> {
  let aq_filtered_tb = aq.fromArrow(arrow_tb, {unpack:true, columns:['CODGEO','NIVGEO','CS1_8','AGEQ65','SEXE', 'NB']})
              .filter( d => d.NIVGEO == 'COM' && d.CS1_8 == '2' && d.AGEQ65 == '60' && d.SEXE== '2' ) 
  
  return aq_filtered_tb.numRows() ;
})

nbRowsFilteredAq = Object {time: 1798, table: 34951}

Using dictionnary-encoded strings makes a huge difference (x10) in performance (and also in arrow file size, x3, and x6 if zipped).
Using classic string columns:
5500 ms with arrow scan
17000 ms with arquero filter

So from 17000 ms early this morning to 400 ms now, i am quite happy, arrow + arquero really rocks!

@ericemc3
Copy link
Author

Here is a more detailed (and reproducible) benchmark comparison:
https://observablehq.com/@ericmauviere/arrow-arquero-benchmarks

@jheer
Copy link
Member

jheer commented Dec 17, 2020

Fixed and released in v1.3.0. The fromArrow() method now includes support for filtered tables (i.e., those where the table length and count() do not match).

@jheer jheer closed this as completed Dec 17, 2020
@ericemc3
Copy link
Author

Great, thanks!

@TheNeuralBit
Copy link

You wouldn't need to filter with arrow in advance if arquero could optimize predicates involving dictionary-encoded columns in the same way that arrow's Table.filter does. @jheer is anything like that planned/feasible?

@jheer
Copy link
Member

jheer commented Jan 22, 2021

Thanks @TheNeuralBit, this is indeed something I am thinking about and should be feasible. That said, it's not currently at the top of my priority queue. In any case, clients could pass an Arrow FilteredDataFrame to Arquero, and so we should properly handle that (as added in PR #60).

@TheNeuralBit
Copy link

Thanks Jeff. You're right filtering in Arrow first is a fine workaround for now. I'm just thinking that if arquero gained that ability then maybe Arrow could/should get out of the business of a JS compute library and defer to arquero. The dictionary optimization is it's only real selling point right now.

I filed #86 to track the work. I'd be interested in helping out with it if you could provide a few code pointers

@ericemc3
Copy link
Author

really happy to see Brian's engagment here, considering all he has achieved so far for Arrow and its js library!

As i also tried to analyse here: the dictionary encoding in Arrow allows for spectacular performances, with filter, but also summarise operations (simple sum operations group by dictionary encoded variables).

More generally, Arrow js implementation looks somewhat frozen, with from my point of view key pending issues (especially compression for a web usage, considering raw Arrow file sizes):
[JS] toArray() called on filtered Table returns all rows
[JS] Implement IPC RecordBatch body buffer compression from ARROW

No doubt for me that Arquero leveraging Arrow and your engagement could boost all that!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants