Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read-stream-dataset-inplace fails on large arrow file #126

Closed
behrica opened this issue Jul 31, 2020 · 17 comments
Closed

read-stream-dataset-inplace fails on large arrow file #126

behrica opened this issue Jul 31, 2020 · 17 comments

Comments

@behrica
Copy link
Contributor

behrica commented Jul 31, 2020

I created a large arrow file on disk (14 GB) and tried to read it with
read-stream-dataset-inplace

It fails with:

. Unhandled java.lang.IllegalArgumentException
  Value out of range for int: 14779929936

                  RT.java: 1248  clojure.lang.RT/intCast
                base.cljc:  107  tech.v2.datatype.base$sub_buffer/invokeStatic
                base.cljc:  105  tech.v2.datatype.base$sub_buffer/invoke
             datatype.clj:  418  tech.v2.datatype/sub-buffer
             datatype.clj:  413  tech.v2.datatype/sub-buffer
              RestFn.java:  425  clojure.lang.RestFn/invoke
             in_place.clj:   59  tech.libs.arrow.in-place/read-message
             in_place.clj:   46  tech.libs.arrow.in-place/read-message
             in_place.clj:   73  tech.libs.arrow.in-place/message-seq
             in_place.clj:   70  tech.libs.arrow.in-place/message-seq
             in_place.clj:  308  tech.libs.arrow.in-place/read-stream-dataset-inplace
@cnuernber
Copy link
Collaborator

Awesome, at least it failed as opposed to read bad data. Will dive into this, thanks.

@cnuernber
Copy link
Collaborator

Do you have an R script for creating this file? Maybe I can from the taxi cab example earlier.

@behrica
Copy link
Contributor Author

behrica commented Aug 1, 2020

I created it from a large csv file quite easily, by using 2 R libraries.
readr::read_csv_chunked

And
arrow::RecordBatchStreamWriter

Starting from a 15G Csv (which I just "cat"ed together) I read CSV in batches and wrote it to arrow fiomat in batches as well.
I will send you some example code

@behrica
Copy link
Contributor Author

behrica commented Aug 1, 2020

It looks like this, but without batched reading of writing.
So it works only for CSV which fit in R memory:

library(readr)
library(arrow)
df <- readr::read_csv("in.csv")
tf<-"out.arrow"
file_obj <- FileOutputStream$create(tf)
batch <- record_batch(df)
writer <- RecordBatchStreamWriter$create(file_obj,batch$schema)
writer$write(batch)
writer$close()
file_obj$close()

but it works similar for batched reading / writing by using readr::read_csv_chunked and calling writer$write(x) in the call back function passed into readr::read_csv_chunked

@cnuernber
Copy link
Collaborator

Ah, got it. I guess you could also just keep calling writer$write(batch) in a loop and get a similar result. I am download the taxi
data and that could also be written to a (quite large) batched arrow file.

@behrica
Copy link
Contributor Author

behrica commented Aug 1, 2020

I fixed the exception.

The code code in in_place.clj calls the wrong sub-buffer funciton.
It does not call the one of the NativeBuffer record, but the function in base.cljs.
In this line for example:

(let [new-msg (Message/getRootAsMessage (-> (dtype/sub-buffer data offset msg-size)

But now it returns only the first "batch". I wrote the large file indeed in several batches,
and I get exactly a dataset where the number of rows is the batch size.

How am I supposed to get the full dataset (combination of all batches) ?

@cnuernber
Copy link
Collaborator

cnuernber commented Aug 1, 2020

I attempted to fix the base itself and checked it in. I have not developed placed a pathway to load all record-batches. This involves tracking dictionary-batches and applying differential changes to the dictionaries and then producing a dataset per-batch. So a really large arrow file will produce a sequence of datasets -

(defn read-stream-dataset-inplace

That code, read in 0 or more dictionaries and the next record batch would just be in a loop or sequence construct in order to return a sequence of datasets instead of the first one. The arrow specification is conflicted about dictionaries:

https://arrow.apache.org/docs/format/Columnar.html#dictionary-encoded-layout

I am currently finishing up a blog post outlining memory mapping and some basic performance comparisons to the Java Arrow API so I have not got into working with large multirecord arrow files. You are brave :-) but I am really excited you are going for it.

@cnuernber
Copy link
Collaborator

The work here will be I think past getting things to load thinking about how to build a sane interface for a sequence of datasets as opposed to a single one. I think the dataset protocols will work but of course without doing it no one knows. Things like filter need to be implemented in a multi-dataset-aware fashion. Currently the way concat-copying works is at the reader level and this imposes a per-index cost to each element access which is I believe unacceptable. With a multi-dataset concept, assuming the datasets share a schema, you could do select, filter, and group-by in smarter ways.

@cnuernber
Copy link
Collaborator

cnuernber commented Aug 1, 2020

Another thing about the article you linked to from R is that R's filter takes an AST so it can analyse the AST to strip out entire datasets when, for instance, a column's min and max lie outside the filter inclusion set. This is quite an optimization and you can't make it generically just with functions that return true or false; you would have to know the function is a greater/less than type filter.

@behrica
Copy link
Contributor Author

behrica commented Aug 1, 2020

I slightly modified read-stream-dataset-inplace so that it returns a lazy sequence of datasets,
one per record batch.

In my 15 GB case, I had arround 1000 of them.

Then I did a frequency count of the values of one column and aggregated over all datasets.
(So I did not try to concat the datasets)

It was all very straight forward and is was very fast

It took roughly 90 seconds going over the 15 GB
This is already an amazing result, in my view.

So we are definitely on the right track to work with large data from clojure

@behrica
Copy link
Contributor Author

behrica commented Aug 1, 2020

I played a bit more with it and the overall experience is very, very good.

To have the "very large data" as a lazy sequence of datasets makes exploratory analysis very convenient and fast.

It seems to be that the combination of "lazy clojure" + "mmaped files" gives very good results.
Working in the repl feels very fast, and things are only "slow" when expected (if access to all 15 GB is indeed needed). It it is a "good" slowness, in the sense of repl stays stable after operation finishes. There is of course no java GC pressure.

@behrica
Copy link
Contributor Author

behrica commented Aug 1, 2020

For me it is therefore perfectly fine to change the behavior of "read-stream-dataset-inplace" to return a lazy sequence of datasets.
(one data set per record batch)

The only "issue" with this, that by using an "external arrow file", we might not be able to control the batch size, and might get the worst case (= all data in one huge batch)

In my case I did not see the issue you described above about "Multiple dictionaries".

@behrica
Copy link
Contributor Author

behrica commented Aug 1, 2020

I will now test you new code

@cnuernber
Copy link
Collaborator

This is actually amazing. You may be literally the first person on the JVM to load and work with datasets of this size in this way. Normally they would have setup a hadoop cluster with spark and a ton more drama and you can now do this work on a laptop. What a solid validation of the design, I am just really happy with this outcome so far.

@behrica
Copy link
Contributor Author

behrica commented Aug 1, 2020

The new code works, we can close this issue.

@behrica behrica closed this as completed Aug 1, 2020
@harold
Copy link
Contributor

harold commented Aug 1, 2020

It seems to be that the combination of "lazy clojure" + "mmaped files" gives very good results.

We were just discussing this precise claim. Very cool.

@behrica
Copy link
Contributor Author

behrica commented Aug 1, 2020

This is actually amazing. You may be literally the first person on the JVM to load and work with datasets of this size in this way. Normally they would have setup a hadoop cluster with spark and a ton more drama and you can now do this work on a laptop.
-> A VM with 2 cores and 8 GB ram in total

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants